diff --git a/common/deliverclient/util.go b/common/deliverclient/util.go new file mode 100644 index 00000000000..b91473b10c2 --- /dev/null +++ b/common/deliverclient/util.go @@ -0,0 +1,55 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package deliverclient + +import ( + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/configtx" + "github.com/hyperledger/fabric/protoutil" + "github.com/pkg/errors" +) + +var ErrNotAConfig = errors.New("not a config block") + +// ConfigFromBlock returns a ConfigEnvelope if exists, or a *ErrNotAConfig error. +// It may also return some other error in case parsing failed. +func ConfigFromBlock(block *common.Block) (*common.ConfigEnvelope, error) { + if block == nil || block.Data == nil || len(block.Data.Data) == 0 { + return nil, errors.New("empty block") + } + txn := block.Data.Data[0] + env, err := protoutil.GetEnvelopeFromBlock(txn) + if err != nil { + return nil, errors.WithStack(err) + } + payload, err := protoutil.UnmarshalPayload(env.Payload) + if err != nil { + return nil, errors.WithStack(err) + } + if block.Header.Number == 0 { + configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) + if err != nil { + return nil, errors.Wrap(err, "invalid config envelope") + } + return configEnvelope, nil + } + if payload.Header == nil { + return nil, errors.New("nil header in payload") + } + chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader) + if err != nil { + return nil, errors.WithStack(err) + } + if common.HeaderType(chdr.Type) != common.HeaderType_CONFIG { + return nil, ErrNotAConfig + } + configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) + if err != nil { + return nil, errors.Wrap(err, "invalid config envelope") + } + return configEnvelope, nil +} diff --git a/common/deliverclient/util_test.go b/common/deliverclient/util_test.go new file mode 100644 index 00000000000..99c8d7d72ab --- /dev/null +++ b/common/deliverclient/util_test.go @@ -0,0 +1,106 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package deliverclient_test + +import ( + "testing" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/deliverclient" + "github.com/hyperledger/fabric/protoutil" + "github.com/stretchr/testify/require" +) + +func TestConfigFromBlockBadInput(t *testing.T) { + for _, testCase := range []struct { + name string + block *common.Block + expectedError string + }{ + { + name: "nil block", + expectedError: "empty block", + block: nil, + }, + { + name: "nil block data", + expectedError: "empty block", + block: &common.Block{}, + }, + { + name: "no data in block", + expectedError: "empty block", + block: &common.Block{Data: &common.BlockData{}}, + }, + { + name: "invalid payload", + expectedError: "error unmarshalling Envelope", + block: &common.Block{Data: &common.BlockData{Data: [][]byte{{1, 2, 3}}}}, + }, + { + name: "bad genesis block", + expectedError: "invalid config envelope", + block: &common.Block{ + Header: &common.BlockHeader{}, Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{ + Payload: protoutil.MarshalOrPanic(&common.Payload{ + Data: []byte{1, 2, 3}, + }), + })}}, + }, + }, + { + name: "invalid envelope in block", + expectedError: "error unmarshalling Envelope", + block: &common.Block{Data: &common.BlockData{Data: [][]byte{{1, 2, 3}}}}, + }, + { + name: "invalid payload in block envelope", + expectedError: "error unmarshalling Payload", + block: &common.Block{Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{ + Payload: []byte{1, 2, 3}, + })}}}, + }, + { + name: "invalid channel header", + expectedError: "error unmarshalling ChannelHeader", + block: &common.Block{ + Header: &common.BlockHeader{Number: 1}, + Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{ + Payload: protoutil.MarshalOrPanic(&common.Payload{ + Header: &common.Header{ + ChannelHeader: []byte{1, 2, 3}, + }, + }), + })}}, + }, + }, + { + name: "invalid config block", + expectedError: "invalid config envelope", + block: &common.Block{ + Header: &common.BlockHeader{}, + Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{ + Payload: protoutil.MarshalOrPanic(&common.Payload{ + Data: []byte{1, 2, 3}, + Header: &common.Header{ + ChannelHeader: protoutil.MarshalOrPanic(&common.ChannelHeader{ + Type: int32(common.HeaderType_CONFIG), + }), + }, + }), + })}}, + }, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + conf, err := deliverclient.ConfigFromBlock(testCase.block) + require.Nil(t, conf) + require.Error(t, err) + require.Contains(t, err.Error(), testCase.expectedError) + }) + } +} diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 601f1e2497b..853b7e71c94 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -72,8 +72,9 @@ type Config struct { // Gossip enables to enumerate peers in the channel, send a message to peers, // and add a block to the gossip state transfer layer. Gossip blocksprovider.GossipServiceAdapter - // OrdererSource provides orderer endpoints, complete with TLS cert pools. - OrdererSource *orderers.ConnectionSource + // OrdererEndpointOverrides provides peer-specific orderer endpoints overrides. + // These are loaded once when the peer starts. + OrdererEndpointOverrides map[string]*orderers.Endpoint // Signer is the identity used to sign requests. Signer identity.SignerSerializer // DeliverServiceConfig is the configuration object. @@ -191,14 +192,15 @@ func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo SecOpts: d.conf.DeliverServiceConfig.SecOpts, }, }, - Orderers: d.conf.OrdererSource, - DoneC: make(chan struct{}), - Signer: d.conf.Signer, - DeliverStreamer: blocksprovider.DeliverAdapter{}, - Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID), - MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold, - MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold, - InitialRetryInterval: 100 * time.Millisecond, + OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides}, + CryptoProvider: d.conf.CryptoProvider, + DoneC: make(chan struct{}), + Signer: d.conf.Signer, + DeliverStreamer: blocksprovider.DeliverAdapter{}, + Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID), + MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold, + MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold, + InitialRetryInterval: 100 * time.Millisecond, MaxRetryDurationExceededHandler: func() (stopRetries bool) { return !d.conf.IsStaticLeader }, @@ -212,7 +214,7 @@ func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo dc.TLSCertHash = util.ComputeSHA256(cert.Certificate[0]) } - dc.Initialize() + dc.Initialize(d.conf.ChannelConfig) return dc, nil } @@ -254,7 +256,8 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo SecOpts: d.conf.DeliverServiceConfig.SecOpts, }, }, - Orderers: d.conf.OrdererSource, + OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides}, + CryptoProvider: d.conf.CryptoProvider, DoneC: make(chan struct{}), Signer: d.conf.Signer, DeliverStreamer: blocksprovider.DeliverAdapter{}, @@ -277,7 +280,7 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo dcBFT.TLSCertHash = util.ComputeSHA256(cert.Certificate[0]) } - dcBFT.Initialize() + dcBFT.Initialize(d.conf.ChannelConfig) return dcBFT, nil } diff --git a/core/peer/peer.go b/core/peer/peer.go index a6dfeb7d189..d9cae6d01cc 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -288,28 +288,6 @@ func (p *Peer) createChannel( mspmgmt.XXXSetMSPManager(cid, bundle.MSPManager()) } - osLogger := flogging.MustGetLogger("peer.orderers") - namedOSLogger := osLogger.With("channel", cid) - ordererSource := orderers.NewConnectionSource(namedOSLogger, p.OrdererEndpointOverrides) - - ordererSourceCallback := func(bundle *channelconfig.Bundle) { - globalAddresses := bundle.ChannelConfig().OrdererAddresses() - orgAddresses := map[string]orderers.OrdererOrg{} - if ordererConfig, ok := bundle.OrdererConfig(); ok { - for orgName, org := range ordererConfig.Organizations() { - var certs [][]byte - certs = append(certs, org.MSP().GetTLSRootCerts()...) - certs = append(certs, org.MSP().GetTLSIntermediateCerts()...) - - orgAddresses[orgName] = orderers.OrdererOrg{ - Addresses: org.Endpoints(), - RootCerts: certs, - } - } - } - ordererSource.Update(globalAddresses, orgAddresses) - } - channel := &Channel{ ledger: l, resources: bundle, @@ -317,7 +295,6 @@ func (p *Peer) createChannel( } callbacks := []channelconfig.BundleActor{ - ordererSourceCallback, gossipCallbackWrapper, trustedRootsCallbackWrapper, mspCallback, @@ -373,7 +350,7 @@ func (p *Peer) createChannel( p.GossipService.InitializeChannel( bundle.ConfigtxValidator().ChannelID(), - ordererSource, + p.OrdererEndpointOverrides, store, gossipservice.Support{ Validator: validator, diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 2f11d92beae..de2845a97ba 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -127,7 +127,7 @@ type GossipServiceAdapter interface { // DeliveryServiceFactory factory to create and initialize delivery service instance type DeliveryServiceFactory interface { // Returns an instance of delivery client - Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService + Service(g GossipServiceAdapter, ordererEndpointOverrides map[string]*orderers.Endpoint, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService } type deliveryFactoryImpl struct { @@ -139,21 +139,20 @@ type deliveryFactoryImpl struct { // Returns an instance of delivery service func (df *deliveryFactoryImpl) Service( g GossipServiceAdapter, - ordererSource *orderers.ConnectionSource, - mcs api.MessageCryptoService, // TODO remove + ordererEndpointOverrides map[string]*orderers.Endpoint, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP, ) deliverservice.DeliverService { return deliverservice.NewDeliverService( &deliverservice.Config{ - IsStaticLeader: isStaticLead, - Gossip: g, - Signer: df.signer, - DeliverServiceConfig: df.deliverServiceConfig, - OrdererSource: ordererSource, - ChannelConfig: channelConfig, - CryptoProvider: cryptoProvider, + IsStaticLeader: isStaticLead, + Gossip: g, + Signer: df.signer, + DeliverServiceConfig: df.deliverServiceConfig, + OrdererEndpointOverrides: ordererEndpointOverrides, + ChannelConfig: channelConfig, + CryptoProvider: cryptoProvider, }) } @@ -334,7 +333,14 @@ type Support struct { } // InitializeChannel allocates the state provider and should be invoked once per channel per execution -func (g *GossipService) InitializeChannel(channelID string, ordererSource *orderers.ConnectionSource, store *transientstore.Store, support Support, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) { +func (g *GossipService) InitializeChannel( + channelID string, + ordererEndpointOverrides map[string]*orderers.Endpoint, + store *transientstore.Store, + support Support, + channelConfig *cb.Config, + cryptoProvider bccsp.BCCSP, +) { g.lock.Lock() defer g.lock.Unlock() // Initialize new state provider for given committer @@ -393,7 +399,7 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order blockingMode, stateConfig) if g.deliveryService[channelID] == nil { - g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererSource, g.mcs, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider) + g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererEndpointOverrides, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider) } // Delivery service might be nil only if it was not able to get connected diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index e838c0b7d40..67410b3e5b5 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -20,7 +20,6 @@ import ( "github.com/hyperledger/fabric/bccsp/factory" "github.com/hyperledger/fabric/bccsp/sw" "github.com/hyperledger/fabric/common/channelconfig" - "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/metrics/disabled" "github.com/hyperledger/fabric/core/deliverservice" "github.com/hyperledger/fabric/core/ledger" @@ -191,7 +190,7 @@ func TestLeaderElectionWithDeliverClient(t *testing.T) { gossips[i].deliveryFactory = deliverServiceFactory deliverServiceFactory.service.running = false - gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{ + gossips[i].InitializeChannel(channelName, nil, store.Store, Support{ Committer: &mockLedgerInfo{1}, }, nil, nil) service, exist := gossips[i].leaderElection[channelName] @@ -252,7 +251,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) { for i := 0; i < n; i++ { gossips[i].deliveryFactory = deliverServiceFactory deliverServiceFactory.service.running = false - gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{ + gossips[i].InitializeChannel(channelName, nil, store.Store, Support{ Committer: &mockLedgerInfo{1}, }, nil, nil) } @@ -265,7 +264,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) { channelName = "chanB" for i := 0; i < n; i++ { deliverServiceFactory.service.running = false - gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{ + gossips[i].InitializeChannel(channelName, nil, store.Store, Support{ Committer: &mockLedgerInfo{1}, }, nil, nil) } @@ -309,7 +308,7 @@ func TestWithStaticDeliverClientNotLeader(t *testing.T) { for i := 0; i < n; i++ { gossips[i].deliveryFactory = deliverServiceFactory deliverServiceFactory.service.running = false - gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{ + gossips[i].InitializeChannel(channelName, nil, store.Store, Support{ Committer: &mockLedgerInfo{1}, }, nil, nil) } @@ -354,7 +353,7 @@ func TestWithStaticDeliverClientBothStaticAndLeaderElection(t *testing.T) { for i := 0; i < n; i++ { gossips[i].deliveryFactory = deliverServiceFactory require.Panics(t, func() { - gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{ + gossips[i].InitializeChannel(channelName, nil, store.Store, Support{ Committer: &mockLedgerInfo{1}, }, nil, nil) }, "Dynamic leader election based and static connection to ordering service can't exist simultaneously") @@ -367,7 +366,7 @@ type mockDeliverServiceFactory struct { service *mockDeliverService } -func (mf *mockDeliverServiceFactory) Service(GossipServiceAdapter, *orderers.ConnectionSource, api.MessageCryptoService, bool, *common.Config, bccsp.BCCSP) deliverservice.DeliverService { +func (mf *mockDeliverServiceFactory) Service(GossipServiceAdapter, map[string]*orderers.Endpoint, bool, *common.Config, bccsp.BCCSP) deliverservice.DeliverService { return mf.service } @@ -419,7 +418,7 @@ func (li *mockLedgerInfo) GetPvtDataAndBlockByNum(seqNum uint64) (*ledger.BlockA } func (li *mockLedgerInfo) GetCurrentBlockHash() ([]byte, error) { - panic("implement me") + return []byte{1, 2, 3, 4}, nil } // LedgerHeight returns mocked value to the ledger height @@ -905,7 +904,7 @@ func TestInvalidInitialization(t *testing.T) { go grpcServer.Serve(socket) defer grpcServer.Stop() - dc := gService.deliveryFactory.Service(gService, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), &naiveCryptoService{}, false, nil, nil) + dc := gService.deliveryFactory.Service(gService, nil, false, nil, nil) require.NotNil(t, dc) } diff --git a/gossip/service/integration_test.go b/gossip/service/integration_test.go index a4e0d0f6e0f..c5c904c874a 100644 --- a/gossip/service/integration_test.go +++ b/gossip/service/integration_test.go @@ -20,10 +20,8 @@ import ( "github.com/hyperledger/fabric/bccsp/sw" "github.com/hyperledger/fabric/common/channelconfig" "github.com/hyperledger/fabric/common/crypto/tlsgen" - "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/core/config/configtest" "github.com/hyperledger/fabric/core/deliverservice" - "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/election" "github.com/hyperledger/fabric/gossip/util" "github.com/hyperledger/fabric/internal/configtxgen/encoder" @@ -81,8 +79,8 @@ type embeddingDeliveryServiceFactory struct { DeliveryServiceFactory } -func (edsf *embeddingDeliveryServiceFactory) Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLead bool, channelConfig *common.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService { - ds := edsf.DeliveryServiceFactory.Service(g, ordererSource, mcs, false, channelConfig, cryptoProvider) +func (edsf *embeddingDeliveryServiceFactory) Service(g GossipServiceAdapter, ordererEndpointOverrides map[string]*orderers.Endpoint, isStaticLead bool, channelConfig *common.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService { + ds := edsf.DeliveryServiceFactory.Service(g, nil, false, channelConfig, cryptoProvider) return newEmbeddingDeliveryService(ds) } @@ -150,16 +148,9 @@ func TestLeaderYield(t *testing.T) { }, } - gs.InitializeChannel( - channelName, - orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), - store.Store, - Support{ - Committer: &mockLedgerInfo{1}, - }, - channelConfigProto, - cryptoProvider, - ) + gs.InitializeChannel(channelName, nil, store.Store, Support{ + Committer: &mockLedgerInfo{1}, + }, channelConfigProto, cryptoProvider) return gs } diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer.go b/internal/pkg/peer/blocksprovider/bft_deliverer.go index ff81a942761..a293f5b7f38 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer.go @@ -13,6 +13,7 @@ import ( "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/orderer" + "github.com/hyperledger/fabric/bccsp" "github.com/hyperledger/fabric/common/deliverclient" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/internal/pkg/identity" @@ -55,13 +56,14 @@ type UpdatableBlockVerifier deliverclient.CloneableUpdatableBlockVerifier // will monitor their progress relative to the block fetcher. If a censorship suspicion is detected, the BFTDeliverer // will try to find another orderer to fetch from. type BFTDeliverer struct { - ChannelID string - BlockHandler BlockHandler - Ledger LedgerInfo - ChannelConfig *common.Config + ChannelID string + BlockHandler BlockHandler + Ledger LedgerInfo + UpdatableBlockVerifier UpdatableBlockVerifier Dialer Dialer - Orderers OrdererConnectionSource + OrderersSourceFactory OrdererConnectionSourceFactory + CryptoProvider bccsp.BCCSP DoneC chan struct{} Signer identity.SignerSerializer DeliverStreamer DeliverStreamer @@ -86,12 +88,12 @@ type BFTDeliverer struct { sleeper sleeper requester *DeliveryRequester + orderers OrdererConnectionSource mutex sync.Mutex // mutex protects the following fields stopFlag bool // mark the Deliverer as stopped nextBlockNumber uint64 // next block number lastBlockTime time.Time // last block time - lastBlockSourceIndex int // the source index of the last block we got, or -1 fetchFailureCounter int // counts the number of consecutive failures to fetch a block fetchFailureTotalSleepDuration time.Duration // the cumulative sleep time from when fetchFailureCounter goes 0->1 @@ -103,7 +105,7 @@ type BFTDeliverer struct { censorshipMonitor CensorshipDetector } -func (d *BFTDeliverer) Initialize() { +func (d *BFTDeliverer) Initialize(channelConfig *common.Config) { d.requester = NewDeliveryRequester( d.ChannelID, d.Signer, @@ -111,6 +113,16 @@ func (d *BFTDeliverer) Initialize() { d.Dialer, d.DeliverStreamer, ) + + osLogger := flogging.MustGetLogger("peer.orderers") + ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger) + globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) + if err != nil { + // The bundle was created prior to calling this function, so it should not fail when we recreate it here. + d.Logger.Panicf("Bundle creation should not have failed: %s", err) + } + ordererSource.Update(globalAddresses, orgAddresses) + d.orderers = ordererSource } func (d *BFTDeliverer) BlockProgress() (uint64, time.Time) { @@ -125,18 +137,15 @@ func (d *BFTDeliverer) BlockProgress() (uint64, time.Time) { } func (d *BFTDeliverer) DeliverBlocks() { - var err error - - d.lastBlockSourceIndex = -1 - d.lastBlockTime = time.Now() - d.nextBlockNumber, err = d.Ledger.LedgerHeight() - if err != nil { - d.Logger.Error("Did not return ledger height, something is critically wrong", err) + if err := d.initDeliverBlocks(); err != nil { + d.Logger.Errorf("Failed to start DeliverBlocks: %s", err) return } - d.Logger.Infof("Starting to DeliverBlocks on channel `%s`, block height=%d", d.ChannelID, d.nextBlockNumber) defer func() { + d.mutex.Lock() + defer d.mutex.Unlock() + d.Logger.Infof("Stopping to DeliverBlocks on channel `%s`, block height=%d", d.ChannelID, d.nextBlockNumber) }() @@ -193,6 +202,22 @@ func (d *BFTDeliverer) DeliverBlocks() { } } +func (d *BFTDeliverer) initDeliverBlocks() (err error) { + d.mutex.Lock() + defer d.mutex.Unlock() + + d.lastBlockTime = time.Now() + d.nextBlockNumber, err = d.Ledger.LedgerHeight() + if err != nil { + d.Logger.Errorf("Did not return ledger height, something is critically wrong: %s", err) + return + } + + d.Logger.Infof("Starting to DeliverBlocks on channel `%s`, block height=%d", d.ChannelID, d.nextBlockNumber) + + return nil +} + // retryBackoff computes the backoff duration and wait before retrying. // The backoff duration is doubled with every failed round. // A failed round is when we had moved through all the endpoints without success. @@ -282,7 +307,7 @@ func (d *BFTDeliverer) handleFetchAndCensorshipEvents() (stopLoop bool) { func (d *BFTDeliverer) refreshSources() { // select an initial source randomly - d.fetchSources = d.Orderers.ShuffledEndpoints() + d.fetchSources = d.orderers.ShuffledEndpoints() d.Logger.Infof("Refreshed endpoints: %s", d.fetchSources) d.fetchSourceIndex = 0 } @@ -364,8 +389,8 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) { } } -func (d *BFTDeliverer) onBlockProcessingSuccess(blockNum uint64) { - d.Logger.Debugf("blockNum: %d", blockNum) +func (d *BFTDeliverer) onBlockProcessingSuccess(blockNum uint64, channelConfig *common.Config) { + d.Logger.Debugf("onBlockProcessingSuccess: %d, %v", blockNum, channelConfig) d.mutex.Lock() defer d.mutex.Unlock() @@ -375,6 +400,19 @@ func (d *BFTDeliverer) onBlockProcessingSuccess(blockNum uint64) { d.nextBlockNumber = blockNum + 1 d.lastBlockTime = time.Now() + + if channelConfig != nil { + globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) + if err != nil { + // The bundle was created prior to calling this function, so it should not fail when we recreate it here. + d.Logger.Panicf("Bundle creation should not have failed: %s", err) + } + d.Logger.Debugf("Extracted orderer addresses: global %v, orgs: %v", globalAddresses, orgAddresses) + d.orderers.Update(globalAddresses, orgAddresses) + d.Logger.Debugf("Updated OrdererConnectionSource") + } + + d.Logger.Debug("onBlockProcessingSuccess: exit") } func (d *BFTDeliverer) resetFetchFailureCounter() { diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go index 053797b8f54..5f4944d860a 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go @@ -17,6 +17,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/orderer" + "github.com/hyperledger/fabric/bccsp" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider/fake" @@ -35,18 +36,21 @@ type bftDelivererTestSetup struct { gWithT *WithT d *blocksprovider.BFTDeliverer - fakeDialer *fake.Dialer - fakeBlockHandler *fake.BlockHandler - fakeOrdererConnectionSource *fake.OrdererConnectionSource - fakeLedgerInfo *fake.LedgerInfo - fakeUpdatableBlockVerifier *fake.UpdatableBlockVerifier - fakeSigner *fake.Signer - fakeDeliverStreamer *fake.DeliverStreamer - fakeDeliverClient *fake.DeliverClient - fakeCensorshipMonFactory *fake.CensorshipDetectorFactory - fakeCensorshipMon *fake.CensorshipDetector - fakeSleeper *fake.Sleeper - fakeDurationExceededHandler *fake.DurationExceededHandler + fakeDialer *fake.Dialer + fakeBlockHandler *fake.BlockHandler + fakeOrdererConnectionSource *fake.OrdererConnectionSource + fakeOrdererConnectionSourceFactory *fake.OrdererConnectionSourceFactory + fakeLedgerInfo *fake.LedgerInfo + fakeUpdatableBlockVerifier *fake.UpdatableBlockVerifier + fakeSigner *fake.Signer + fakeDeliverStreamer *fake.DeliverStreamer + fakeDeliverClient *fake.DeliverClient + fakeCensorshipMonFactory *fake.CensorshipDetectorFactory + fakeCensorshipMon *fake.CensorshipDetector + fakeSleeper *fake.Sleeper + fakeDurationExceededHandler *fake.DurationExceededHandler + fakeCryptoProvider bccsp.BCCSP + channelConfig *common.Config deliverClientDoneC chan struct{} // signals the deliverClient to exit recvStepC chan *orderer.DeliverResponse @@ -63,32 +67,34 @@ type bftDelivererTestSetup struct { func newBFTDelivererTestSetup(t *testing.T) *bftDelivererTestSetup { s := &bftDelivererTestSetup{ - gWithT: NewWithT(t), - fakeDialer: &fake.Dialer{}, - fakeBlockHandler: &fake.BlockHandler{}, - fakeOrdererConnectionSource: &fake.OrdererConnectionSource{}, - fakeLedgerInfo: &fake.LedgerInfo{}, - fakeUpdatableBlockVerifier: &fake.UpdatableBlockVerifier{}, - fakeSigner: &fake.Signer{}, - fakeDeliverStreamer: &fake.DeliverStreamer{}, - fakeDeliverClient: &fake.DeliverClient{}, - fakeCensorshipMonFactory: &fake.CensorshipDetectorFactory{}, - fakeSleeper: &fake.Sleeper{}, - fakeDurationExceededHandler: &fake.DurationExceededHandler{}, - deliverClientDoneC: make(chan struct{}), - recvStepC: make(chan *orderer.DeliverResponse), - endC: make(chan struct{}), + gWithT: NewWithT(t), + fakeDialer: &fake.Dialer{}, + fakeBlockHandler: &fake.BlockHandler{}, + fakeOrdererConnectionSource: &fake.OrdererConnectionSource{}, + fakeOrdererConnectionSourceFactory: &fake.OrdererConnectionSourceFactory{}, + fakeLedgerInfo: &fake.LedgerInfo{}, + fakeUpdatableBlockVerifier: &fake.UpdatableBlockVerifier{}, + fakeSigner: &fake.Signer{}, + fakeDeliverStreamer: &fake.DeliverStreamer{}, + fakeDeliverClient: &fake.DeliverClient{}, + fakeCensorshipMonFactory: &fake.CensorshipDetectorFactory{}, + fakeSleeper: &fake.Sleeper{}, + fakeDurationExceededHandler: &fake.DurationExceededHandler{}, + deliverClientDoneC: make(chan struct{}), + recvStepC: make(chan *orderer.DeliverResponse), + endC: make(chan struct{}), } return s } func (s *bftDelivererTestSetup) initialize(t *testing.T) { + tempDir := t.TempDir() s.fakeDialer.DialStub = func(string, [][]byte) (*grpc.ClientConn, error) { s.mutex.Lock() defer s.mutex.Unlock() - cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial("localhost:6005", grpc.WithTransportCredentials(insecure.NewCredentials())) s.clientConnSet = append(s.clientConnSet, cc) require.NoError(t, err) require.NotEqual(t, connectivity.Shutdown, cc.GetState()) @@ -124,6 +130,8 @@ func (s *bftDelivererTestSetup) initialize(t *testing.T) { } s.fakeOrdererConnectionSource.ShuffledEndpointsReturns(sources) + s.fakeOrdererConnectionSourceFactory.CreateConnectionSourceReturns(s.fakeOrdererConnectionSource) + s.fakeSigner.SignReturns([]byte("good-sig"), nil) s.fakeDeliverClient.RecvStub = func() (*orderer.DeliverResponse, error) { @@ -196,13 +204,18 @@ func (s *bftDelivererTestSetup) initialize(t *testing.T) { return mon }) + var err error + s.channelConfig, s.fakeCryptoProvider, err = testSetup(tempDir, "BFT") + require.NoError(t, err) + s.d = &blocksprovider.BFTDeliverer{ ChannelID: "channel-id", BlockHandler: s.fakeBlockHandler, Ledger: s.fakeLedgerInfo, UpdatableBlockVerifier: s.fakeUpdatableBlockVerifier, Dialer: s.fakeDialer, - Orderers: s.fakeOrdererConnectionSource, + OrderersSourceFactory: s.fakeOrdererConnectionSourceFactory, + CryptoProvider: s.fakeCryptoProvider, DoneC: make(chan struct{}), Signer: s.fakeSigner, DeliverStreamer: s.fakeDeliverStreamer, @@ -215,7 +228,7 @@ func (s *bftDelivererTestSetup) initialize(t *testing.T) { MaxRetryDuration: 600 * time.Second, MaxRetryDurationExceededHandler: s.fakeDurationExceededHandler.DurationExceededHandler, } - s.d.Initialize() + s.d.Initialize(s.channelConfig) s.fakeSleeper = &fake.Sleeper{} @@ -374,7 +387,7 @@ func TestBFTDeliverer_DialRetries(t *testing.T) { setup.initialize(t) setup.fakeDialer.DialReturnsOnCall(0, nil, fmt.Errorf("fake-dial-error")) - cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial("localhost:6005", grpc.WithTransportCredentials(insecure.NewCredentials())) setup.gWithT.Expect(err).NotTo(HaveOccurred()) setup.fakeDialer.DialReturnsOnCall(1, cc, nil) @@ -408,7 +421,7 @@ func TestBFTDeliverer_DialRetries(t *testing.T) { setup.fakeDialer.DialReturnsOnCall(i, nil, fmt.Errorf("fake-dial-error")) } - cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial("localhost:6005", grpc.WithTransportCredentials(insecure.NewCredentials())) setup.gWithT.Expect(err).NotTo(HaveOccurred()) setup.fakeDialer.DialReturnsOnCall(24, cc, nil) @@ -802,7 +815,7 @@ func TestBFTDeliverer_BlockReception(t *testing.T) { setup.fakeDialer.DialReturnsOnCall(i, nil, fmt.Errorf("fake-dial-error")) } // success - cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial("localhost:6005", grpc.WithTransportCredentials(insecure.NewCredentials())) setup.gWithT.Expect(err).NotTo(HaveOccurred()) require.NotNil(t, cc) setup.fakeDialer.DialReturns(cc, nil) @@ -837,9 +850,10 @@ func TestBFTDeliverer_BlockReception(t *testing.T) { require.True(t, proto.Equal(block2, &common.Block{Header: &common.BlockHeader{Number: 7}})) t.Log("block progress is reported correctly") - bNum, bTime := setup.d.BlockProgress() - require.Equal(t, uint64(7), bNum) - require.True(t, bTime.After(startTime)) + require.Eventually(t, func() bool { + bNum, bTime := setup.d.BlockProgress() + return uint64(7) == bNum && bTime.After(startTime) + }, 5*time.Second, 10*time.Millisecond) setup.gWithT.Expect(setup.fakeDialer.DialCallCount()).Should(Equal(25)) @@ -880,7 +894,7 @@ func TestBFTDeliverer_BlockReception(t *testing.T) { } // success at attempt 80, 160 and >=240, should reset total sleep time - cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial("localhost:6005", grpc.WithTransportCredentials(insecure.NewCredentials())) setup.gWithT.Expect(err).NotTo(HaveOccurred()) require.NotNil(t, cc) setup.fakeDialer.DialReturns(cc, nil) @@ -931,10 +945,11 @@ func TestBFTDeliverer_BlockReception(t *testing.T) { setup.stop() }) - t.Run("Config block is valid, updates verifier", func(t *testing.T) { + t.Run("Config block is valid, updates verifier, updates connection-source", func(t *testing.T) { flogging.ActivateSpec("debug") setup := newBFTDelivererTestSetup(t) setup.initialize(t) + setup.gWithT.Eventually(setup.fakeOrdererConnectionSource.UpdateCallCount).Should(Equal(1)) startTime := time.Now() t.Log("block progress is reported correctly before start") @@ -958,7 +973,9 @@ func TestBFTDeliverer_BlockReception(t *testing.T) { ChannelId: "test-chain", }), }, - Data: []byte("test bytes"), + Data: protoutil.MarshalOrPanic(&common.ConfigEnvelope{ + Config: setup.channelConfig, // it must be a legal config that can produce a new bundle + }), }), } @@ -1003,12 +1020,16 @@ func TestBFTDeliverer_BlockReception(t *testing.T) { })) t.Log("update config on verifier") - require.Equal(t, 1, setup.fakeUpdatableBlockVerifier.UpdateConfigCallCount()) + setup.gWithT.Eventually(setup.fakeUpdatableBlockVerifier.UpdateConfigCallCount).Should(Equal(1)) t.Log("block progress is reported correctly") - bNum2, bTime2 := setup.d.BlockProgress() - require.Equal(t, uint64(7), bNum2) - require.True(t, bTime2.After(bTime)) + require.Eventually(t, func() bool { + bNum2, bTime2 := setup.d.BlockProgress() + return uint64(7) == bNum2 && bTime2.After(bTime) + }, 5*time.Second, 100*time.Millisecond) + + t.Log("updated orderer source") + setup.gWithT.Eventually(setup.fakeOrdererConnectionSource.UpdateCallCount).Should(Equal(2)) setup.stop() }) diff --git a/internal/pkg/peer/blocksprovider/block_receiver.go b/internal/pkg/peer/blocksprovider/block_receiver.go index 04cf6e51257..751ce815c57 100644 --- a/internal/pkg/peer/blocksprovider/block_receiver.go +++ b/internal/pkg/peer/blocksprovider/block_receiver.go @@ -10,12 +10,12 @@ import ( "fmt" "sync" - "github.com/hyperledger/fabric/protoutil" - "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/orderer" + "github.com/hyperledger/fabric/common/deliverclient" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + "github.com/hyperledger/fabric/protoutil" "github.com/pkg/errors" ) @@ -87,7 +87,7 @@ func (br *BlockReceiver) Stop() { } // ProcessIncoming processes incoming messages until stopped or encounters an error. -func (br *BlockReceiver) ProcessIncoming(onSuccess func(blockNum uint64)) error { +func (br *BlockReceiver) ProcessIncoming(onSuccess func(blockNum uint64, channelConfig *common.Config)) error { var err error RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the connection @@ -104,13 +104,14 @@ RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the c break RecvLoop } var blockNum uint64 - blockNum, err = br.processMsg(response) + var channelConfig *common.Config + blockNum, channelConfig, err = br.processMsg(response) if err != nil { br.logger.Warningf("Got error while attempting to receive blocks: %v", err) err = errors.WithMessagef(err, "got error while attempting to receive blocks from orderer `%s`", br.endpoint.Address) break RecvLoop } - onSuccess(blockNum) + onSuccess(blockNum, channelConfig) case <-br.stopC: br.logger.Infof("BlockReceiver got a signal to stop") err = &ErrStopping{Message: "got a signal to stop"} @@ -125,39 +126,48 @@ RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the c return err } -func (br *BlockReceiver) processMsg(msg *orderer.DeliverResponse) (uint64, error) { +func (br *BlockReceiver) processMsg(msg *orderer.DeliverResponse) (uint64, *common.Config, error) { switch t := msg.GetType().(type) { case *orderer.DeliverResponse_Status: if t.Status == common.Status_SUCCESS { - return 0, errors.Errorf("received success for a seek that should never complete") + return 0, nil, errors.Errorf("received success for a seek that should never complete") } - return 0, errors.Errorf("received bad status %v from orderer", t.Status) + return 0, nil, errors.Errorf("received bad status %v from orderer", t.Status) case *orderer.DeliverResponse_Block: blockNum := t.Block.Header.Number if err := br.updatableBlockVerifier.VerifyBlock(t.Block); err != nil { - return 0, errors.WithMessagef(err, "block [%d] from orderer [%s] could not be verified", blockNum, br.endpoint.String()) + return 0, nil, errors.WithMessagef(err, "block [%d] from orderer [%s] could not be verified", blockNum, br.endpoint.String()) } err := br.blockHandler.HandleBlock(br.channelID, t.Block) if err != nil { - return 0, errors.WithMessagef(err, "block [%d] from orderer [%s] could not be handled", blockNum, br.endpoint.String()) + return 0, nil, errors.WithMessagef(err, "block [%d] from orderer [%s] could not be handled", blockNum, br.endpoint.String()) } br.logger.Debugf("Handled block %d", blockNum) + var channelConfig *common.Config if protoutil.IsConfigBlock(t.Block) { + configEnv, err := deliverclient.ConfigFromBlock(t.Block) + if err != nil { + return 0, nil, errors.WithMessagef(err, "failed to extract channel-config from config block [%d] from orderer [%s]", blockNum, br.endpoint.String()) + } + + channelConfig = configEnv.GetConfig() + br.logger.Debugf("channel config: %+v", channelConfig) + if err := br.updatableBlockVerifier.UpdateConfig(t.Block); err != nil { - return 0, errors.WithMessagef(err, "config block [%d] from orderer [%s] failed to update block verifier", blockNum, br.endpoint.String()) + return 0, nil, errors.WithMessagef(err, "config block [%d] from orderer [%s] failed to update block verifier", blockNum, br.endpoint.String()) } br.logger.Infof("Updated config block %d", blockNum) } br.updatableBlockVerifier.UpdateBlockHeader(t.Block) - return blockNum, nil + return blockNum, channelConfig, nil default: - return 0, errors.Errorf("unknown message type: %T, message: %+v", t, msg) + return 0, nil, errors.Errorf("unknown message type: %T, message: %+v", t, msg) } } diff --git a/internal/pkg/peer/blocksprovider/deliverer.go b/internal/pkg/peer/blocksprovider/deliverer.go index e6731a90418..3a2295b0198 100644 --- a/internal/pkg/peer/blocksprovider/deliverer.go +++ b/internal/pkg/peer/blocksprovider/deliverer.go @@ -12,8 +12,10 @@ import ( "sync" "time" + cb "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/gossip" "github.com/hyperledger/fabric-protos-go/orderer" + "github.com/hyperledger/fabric/bccsp" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/internal/pkg/identity" "github.com/hyperledger/fabric/internal/pkg/peer/orderers" @@ -44,11 +46,11 @@ type GossipServiceAdapter interface { Gossip(msg *gossip.GossipMessage) } +//go:generate counterfeiter -o fake/orderer_connection_source_factory.go --fake-name OrdererConnectionSourceFactory . OrdererConnectionSourceFactory +type OrdererConnectionSourceFactory orderers.ConnectionSourceCreator + //go:generate counterfeiter -o fake/orderer_connection_source.go --fake-name OrdererConnectionSource . OrdererConnectionSource -type OrdererConnectionSource interface { - RandomEndpoint() (*orderers.Endpoint, error) - ShuffledEndpoints() []*orderers.Endpoint -} +type OrdererConnectionSource orderers.ConnectionSourcer //go:generate counterfeiter -o fake/dialer.go --fake-name Dialer . Dialer type Dialer interface { @@ -76,7 +78,8 @@ type Deliverer struct { Ledger LedgerInfo UpdatableBlockVerifier UpdatableBlockVerifier Dialer Dialer - Orderers OrdererConnectionSource + OrderersSourceFactory OrdererConnectionSourceFactory + CryptoProvider bccsp.BCCSP DoneC chan struct{} Signer identity.SignerSerializer DeliverStreamer DeliverStreamer @@ -97,13 +100,14 @@ type Deliverer struct { sleeper sleeper requester *DeliveryRequester + orderers OrdererConnectionSource mutex sync.Mutex stopFlag bool blockReceiver *BlockReceiver } -func (d *Deliverer) Initialize() { +func (d *Deliverer) Initialize(channelConfig *cb.Config) { d.requester = NewDeliveryRequester( d.ChannelID, d.Signer, @@ -111,6 +115,16 @@ func (d *Deliverer) Initialize() { d.Dialer, d.DeliverStreamer, ) + + osLogger := flogging.MustGetLogger("peer.orderers") + ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger) + globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) + if err != nil { + // The bundle was created prior to calling this function, so it should not fail when we recreate it here. + d.Logger.Panicf("Bundle creation should not have failed: %s", err) + } + ordererSource.Update(globalAddresses, orgAddresses) + d.orderers = ordererSource } // DeliverBlocks used to pull out blocks from the ordering service to distribute them across peers @@ -155,7 +169,7 @@ func (d *Deliverer) DeliverBlocks() { return } - endpoint, err := d.Orderers.RandomEndpoint() + endpoint, err := d.orderers.RandomEndpoint() if err != nil { d.Logger.Warningf("Could not connect to ordering service: could not get orderer endpoints: %s", err) failureCounter++ @@ -191,9 +205,18 @@ func (d *Deliverer) DeliverBlocks() { d.mutex.Unlock() blockReceiver.Start() // starts an internal goroutine - onSuccess := func(blockNum uint64) { + onSuccess := func(blockNum uint64, channelConfig *cb.Config) { failureCounter = 0 totalDuration = time.Duration(0) + + if channelConfig != nil { + globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) + if err != nil { + // The bundle was created prior to calling this function, so it should not fail when we recreate it here. + d.Logger.Panicf("Bundle creation should not have failed: %s", err) + } + d.orderers.Update(globalAddresses, orgAddresses) + } } if err := blockReceiver.ProcessIncoming(onSuccess); err != nil { switch err.(type) { diff --git a/internal/pkg/peer/blocksprovider/deliverer_test.go b/internal/pkg/peer/blocksprovider/deliverer_test.go index 3ee3c95a706..d3009ad9188 100644 --- a/internal/pkg/peer/blocksprovider/deliverer_test.go +++ b/internal/pkg/peer/blocksprovider/deliverer_test.go @@ -8,18 +8,26 @@ package blocksprovider_test import ( "fmt" + "io/ioutil" + "os" + "path" "sync" "time" - "github.com/hyperledger/fabric/protoutil" - "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/orderer" + "github.com/hyperledger/fabric/bccsp" + "github.com/hyperledger/fabric/bccsp/sw" + "github.com/hyperledger/fabric/common/crypto/tlsgen" "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/core/config/configtest" + "github.com/hyperledger/fabric/internal/configtxgen/encoder" + "github.com/hyperledger/fabric/internal/configtxgen/genesisconfig" "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider/fake" "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + "github.com/hyperledger/fabric/protoutil" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "google.golang.org/grpc" @@ -29,25 +37,33 @@ import ( var _ = Describe("CFT-Deliverer", func() { var ( - d *blocksprovider.Deliverer - ccs []*grpc.ClientConn - fakeDialer *fake.Dialer - fakeBlockHandler *fake.BlockHandler - fakeOrdererConnectionSource *fake.OrdererConnectionSource - fakeLedgerInfo *fake.LedgerInfo - fakeUpdatableBlockVerifier *fake.UpdatableBlockVerifier - fakeSigner *fake.Signer - fakeDeliverStreamer *fake.DeliverStreamer - fakeDeliverClient *fake.DeliverClient - fakeSleeper *fake.Sleeper - fakeDurationExceededHandler *fake.DurationExceededHandler - doneC chan struct{} - recvStep chan struct{} - endC chan struct{} - mutex sync.Mutex + d *blocksprovider.Deliverer + ccs []*grpc.ClientConn + fakeDialer *fake.Dialer + fakeBlockHandler *fake.BlockHandler + fakeOrdererConnectionSource *fake.OrdererConnectionSource + fakeOrdererConnectionSourceFactory *fake.OrdererConnectionSourceFactory + fakeLedgerInfo *fake.LedgerInfo + fakeUpdatableBlockVerifier *fake.UpdatableBlockVerifier + fakeSigner *fake.Signer + fakeDeliverStreamer *fake.DeliverStreamer + fakeDeliverClient *fake.DeliverClient + fakeSleeper *fake.Sleeper + fakeDurationExceededHandler *fake.DurationExceededHandler + fakeCryptoProvider bccsp.BCCSP + doneC chan struct{} + recvStep chan struct{} + endC chan struct{} + mutex sync.Mutex + tempDir string + channelConfig *common.Config ) BeforeEach(func() { + var err error + tempDir, err = os.MkdirTemp("", "deliverer") + Expect(err).NotTo(HaveOccurred()) + doneC = make(chan struct{}) recvStep = make(chan struct{}) @@ -60,7 +76,7 @@ var _ = Describe("CFT-Deliverer", func() { fakeDialer.DialStub = func(string, [][]byte) (*grpc.ClientConn, error) { mutex.Lock() defer mutex.Unlock() - cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial("localhost:6006", grpc.WithTransportCredentials(insecure.NewCredentials())) ccs = append(ccs, cc) Expect(err).NotTo(HaveOccurred()) Expect(cc.GetState()).NotTo(Equal(connectivity.Shutdown)) @@ -79,6 +95,9 @@ var _ = Describe("CFT-Deliverer", func() { Address: "orderer-address", }, nil) + fakeOrdererConnectionSourceFactory = &fake.OrdererConnectionSourceFactory{} + fakeOrdererConnectionSourceFactory.CreateConnectionSourceReturns(fakeOrdererConnectionSource) + fakeDeliverClient = &fake.DeliverClient{} fakeDeliverClient.RecvStub = func() (*orderer.DeliverResponse, error) { select { @@ -103,13 +122,17 @@ var _ = Describe("CFT-Deliverer", func() { fakeDurationExceededHandler = &fake.DurationExceededHandler{} fakeDurationExceededHandler.DurationExceededHandlerReturns(false) + channelConfig, fakeCryptoProvider, err = testSetup(tempDir, "CFT") + Expect(err).NotTo(HaveOccurred()) + d = &blocksprovider.Deliverer{ ChannelID: "channel-id", BlockHandler: fakeBlockHandler, Ledger: fakeLedgerInfo, UpdatableBlockVerifier: fakeUpdatableBlockVerifier, Dialer: fakeDialer, - Orderers: fakeOrdererConnectionSource, + OrderersSourceFactory: fakeOrdererConnectionSourceFactory, + CryptoProvider: fakeCryptoProvider, DoneC: make(chan struct{}), Signer: fakeSigner, DeliverStreamer: fakeDeliverStreamer, @@ -120,8 +143,7 @@ var _ = Describe("CFT-Deliverer", func() { MaxRetryInterval: 10 * time.Second, InitialRetryInterval: 100 * time.Millisecond, } - d.Initialize() - + d.Initialize(channelConfig) fakeSleeper = &fake.Sleeper{} blocksprovider.SetSleeper(d, fakeSleeper) }) @@ -138,6 +160,8 @@ var _ = Describe("CFT-Deliverer", func() { d.Stop() close(doneC) <-endC + + _ = os.RemoveAll(tempDir) }) It("waits patiently for new blocks from the orderer", func() { @@ -226,7 +250,7 @@ var _ = Describe("CFT-Deliverer", func() { When("the dialer returns an error", func() { BeforeEach(func() { fakeDialer.DialReturnsOnCall(0, nil, fmt.Errorf("fake-dial-error")) - cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial("localhost:6006", grpc.WithTransportCredentials(insecure.NewCredentials())) Expect(err).NotTo(HaveOccurred()) fakeDialer.DialReturnsOnCall(1, cc, nil) }) @@ -280,7 +304,7 @@ var _ = Describe("CFT-Deliverer", func() { }) It("hits the maximum sleep time value in an exponential fashion and retries until exceeding the max retry duration", func() { - Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(BeNumerically(">", 0)) + Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount, 5*time.Second).Should(BeNumerically(">", 0)) Eventually(endC).Should(BeClosed()) Eventually(fakeSleeper.SleepCallCount, 5*time.Second).Should(Equal(380)) Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond)) @@ -384,7 +408,7 @@ var _ = Describe("CFT-Deliverer", func() { Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second)) Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second)) Expect(fakeSleeper.SleepArgsForCall(499)).To(Equal(10 * time.Second)) - Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(Equal(120)) + Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount, 5*time.Second).Should(Equal(120)) }) }) @@ -607,7 +631,6 @@ var _ = Describe("CFT-Deliverer", func() { doneC := doneC recvStep := recvStep fakeDeliverClient := fakeDeliverClient - env = &common.Envelope{ Payload: protoutil.MarshalOrPanic(&common.Payload{ Header: &common.Header{ @@ -616,7 +639,9 @@ var _ = Describe("CFT-Deliverer", func() { ChannelId: "test-chain", }), }, - Data: []byte("test bytes"), + Data: protoutil.MarshalOrPanic(&common.ConfigEnvelope{ + Config: channelConfig, // it must be a legal config that can produce a new bundle + }), }), } @@ -673,6 +698,18 @@ var _ = Describe("CFT-Deliverer", func() { )) Eventually(fakeUpdatableBlockVerifier.VerifyBlockCallCount).Should(Equal(1)) }) + + It("updates the orderer connection source", func() { + Eventually(fakeOrdererConnectionSource.UpdateCallCount()).Should(Equal(1)) + globalAddresses, orgsAddresses := fakeOrdererConnectionSource.UpdateArgsForCall(0) + Expect(globalAddresses).To(BeNil()) + Expect(orgsAddresses).ToNot(BeNil()) + Expect(len(orgsAddresses)).To(Equal(1)) + orgAddr, ok := orgsAddresses["SampleOrg"] + Expect(ok).To(BeTrue()) + Expect(orgAddr.Addresses).To(Equal([]string{"127.0.0.1:7050"})) + Expect(len(orgAddr.RootCerts)).To(Equal(2)) + }) }) When("the deliver client returns a status", func() { @@ -717,3 +754,101 @@ var _ = Describe("CFT-Deliverer", func() { }) }) }) + +func testSetup(certDir string, consensusClass string) (*common.Config, bccsp.BCCSP, error) { + var configProfile *genesisconfig.Profile + tlsCA, err := tlsgen.NewCA() + if err != nil { + return nil, nil, err + } + + switch consensusClass { + case "CFT": + configProfile = genesisconfig.Load(genesisconfig.SampleAppChannelEtcdRaftProfile, configtest.GetDevConfigDir()) + err = generateCertificates(configProfile, tlsCA, certDir) + case "BFT": + configProfile = genesisconfig.Load(genesisconfig.SampleAppChannelSmartBftProfile, configtest.GetDevConfigDir()) + err = generateCertificatesSmartBFT(configProfile, tlsCA, certDir) + default: + err = fmt.Errorf("expected CFT or BFT") + } + + if err != nil { + return nil, nil, err + } + + bootstrapper, err := encoder.NewBootstrapper(configProfile) + if err != nil { + return nil, nil, err + } + channelConfigProto := &common.Config{ChannelGroup: bootstrapper.GenesisChannelGroup()} + cryptoProvider, err := sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore()) + if err != nil { + return nil, nil, err + } + + return channelConfigProto, cryptoProvider, nil +} + +// TODO this pattern repeats itself in several places. Make it common in the 'genesisconfig' package to easily create +// Raft genesis blocks +func generateCertificates(confAppRaft *genesisconfig.Profile, tlsCA tlsgen.CA, certDir string) error { + for i, c := range confAppRaft.Orderer.EtcdRaft.Consenters { + srvC, err := tlsCA.NewServerCertKeyPair(c.Host) + if err != nil { + return err + } + srvP := path.Join(certDir, fmt.Sprintf("server%d.crt", i)) + err = os.WriteFile(srvP, srvC.Cert, 0o644) + if err != nil { + return err + } + + clnC, err := tlsCA.NewClientCertKeyPair() + if err != nil { + return err + } + clnP := path.Join(certDir, fmt.Sprintf("client%d.crt", i)) + err = os.WriteFile(clnP, clnC.Cert, 0o644) + if err != nil { + return err + } + + c.ServerTlsCert = []byte(srvP) + c.ClientTlsCert = []byte(clnP) + } + + return nil +} + +func generateCertificatesSmartBFT(confAppSmartBFT *genesisconfig.Profile, tlsCA tlsgen.CA, certDir string) error { + for i, c := range confAppSmartBFT.Orderer.ConsenterMapping { + srvC, err := tlsCA.NewServerCertKeyPair(c.Host) + if err != nil { + return err + } + + srvP := path.Join(certDir, fmt.Sprintf("server%d.crt", i)) + err = ioutil.WriteFile(srvP, srvC.Cert, 0o644) + if err != nil { + return err + } + + clnC, err := tlsCA.NewClientCertKeyPair() + if err != nil { + return err + } + + clnP := path.Join(certDir, fmt.Sprintf("client%d.crt", i)) + err = ioutil.WriteFile(clnP, clnC.Cert, 0o644) + if err != nil { + return err + } + + c.Identity = srvP + c.ServerTLSCert = srvP + c.ClientTLSCert = clnP + } + + return nil +} diff --git a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go index a8c3487a5a4..47a16fd2070 100644 --- a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go +++ b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go @@ -31,6 +31,12 @@ type OrdererConnectionSource struct { shuffledEndpointsReturnsOnCall map[int]struct { result1 []*orderers.Endpoint } + UpdateStub func([]string, map[string]orderers.OrdererOrg) + updateMutex sync.RWMutex + updateArgsForCall []struct { + arg1 []string + arg2 map[string]orderers.OrdererOrg + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -144,6 +150,44 @@ func (fake *OrdererConnectionSource) ShuffledEndpointsReturnsOnCall(i int, resul }{result1} } +func (fake *OrdererConnectionSource) Update(arg1 []string, arg2 map[string]orderers.OrdererOrg) { + var arg1Copy []string + if arg1 != nil { + arg1Copy = make([]string, len(arg1)) + copy(arg1Copy, arg1) + } + fake.updateMutex.Lock() + fake.updateArgsForCall = append(fake.updateArgsForCall, struct { + arg1 []string + arg2 map[string]orderers.OrdererOrg + }{arg1Copy, arg2}) + stub := fake.UpdateStub + fake.recordInvocation("Update", []interface{}{arg1Copy, arg2}) + fake.updateMutex.Unlock() + if stub != nil { + fake.UpdateStub(arg1, arg2) + } +} + +func (fake *OrdererConnectionSource) UpdateCallCount() int { + fake.updateMutex.RLock() + defer fake.updateMutex.RUnlock() + return len(fake.updateArgsForCall) +} + +func (fake *OrdererConnectionSource) UpdateCalls(stub func([]string, map[string]orderers.OrdererOrg)) { + fake.updateMutex.Lock() + defer fake.updateMutex.Unlock() + fake.UpdateStub = stub +} + +func (fake *OrdererConnectionSource) UpdateArgsForCall(i int) ([]string, map[string]orderers.OrdererOrg) { + fake.updateMutex.RLock() + defer fake.updateMutex.RUnlock() + argsForCall := fake.updateArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *OrdererConnectionSource) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -151,6 +195,8 @@ func (fake *OrdererConnectionSource) Invocations() map[string][][]interface{} { defer fake.randomEndpointMutex.RUnlock() fake.shuffledEndpointsMutex.RLock() defer fake.shuffledEndpointsMutex.RUnlock() + fake.updateMutex.RLock() + defer fake.updateMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go new file mode 100644 index 00000000000..ee0607a87d0 --- /dev/null +++ b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go @@ -0,0 +1,113 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package fake + +import ( + "sync" + + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" +) + +type OrdererConnectionSourceFactory struct { + CreateConnectionSourceStub func(*flogging.FabricLogger) orderers.ConnectionSourcer + createConnectionSourceMutex sync.RWMutex + createConnectionSourceArgsForCall []struct { + arg1 *flogging.FabricLogger + } + createConnectionSourceReturns struct { + result1 orderers.ConnectionSourcer + } + createConnectionSourceReturnsOnCall map[int]struct { + result1 orderers.ConnectionSourcer + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *OrdererConnectionSourceFactory) CreateConnectionSource(arg1 *flogging.FabricLogger) orderers.ConnectionSourcer { + fake.createConnectionSourceMutex.Lock() + ret, specificReturn := fake.createConnectionSourceReturnsOnCall[len(fake.createConnectionSourceArgsForCall)] + fake.createConnectionSourceArgsForCall = append(fake.createConnectionSourceArgsForCall, struct { + arg1 *flogging.FabricLogger + }{arg1}) + stub := fake.CreateConnectionSourceStub + fakeReturns := fake.createConnectionSourceReturns + fake.recordInvocation("CreateConnectionSource", []interface{}{arg1}) + fake.createConnectionSourceMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCallCount() int { + fake.createConnectionSourceMutex.RLock() + defer fake.createConnectionSourceMutex.RUnlock() + return len(fake.createConnectionSourceArgsForCall) +} + +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCalls(stub func(*flogging.FabricLogger) orderers.ConnectionSourcer) { + fake.createConnectionSourceMutex.Lock() + defer fake.createConnectionSourceMutex.Unlock() + fake.CreateConnectionSourceStub = stub +} + +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceArgsForCall(i int) *flogging.FabricLogger { + fake.createConnectionSourceMutex.RLock() + defer fake.createConnectionSourceMutex.RUnlock() + argsForCall := fake.createConnectionSourceArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceReturns(result1 orderers.ConnectionSourcer) { + fake.createConnectionSourceMutex.Lock() + defer fake.createConnectionSourceMutex.Unlock() + fake.CreateConnectionSourceStub = nil + fake.createConnectionSourceReturns = struct { + result1 orderers.ConnectionSourcer + }{result1} +} + +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceReturnsOnCall(i int, result1 orderers.ConnectionSourcer) { + fake.createConnectionSourceMutex.Lock() + defer fake.createConnectionSourceMutex.Unlock() + fake.CreateConnectionSourceStub = nil + if fake.createConnectionSourceReturnsOnCall == nil { + fake.createConnectionSourceReturnsOnCall = make(map[int]struct { + result1 orderers.ConnectionSourcer + }) + } + fake.createConnectionSourceReturnsOnCall[i] = struct { + result1 orderers.ConnectionSourcer + }{result1} +} + +func (fake *OrdererConnectionSourceFactory) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.createConnectionSourceMutex.RLock() + defer fake.createConnectionSourceMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *OrdererConnectionSourceFactory) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ blocksprovider.OrdererConnectionSourceFactory = new(OrdererConnectionSourceFactory) diff --git a/internal/pkg/peer/blocksprovider/util.go b/internal/pkg/peer/blocksprovider/util.go index 96c42d7473d..7319787ab67 100644 --- a/internal/pkg/peer/blocksprovider/util.go +++ b/internal/pkg/peer/blocksprovider/util.go @@ -9,6 +9,11 @@ package blocksprovider import ( "math" "time" + + cb "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/bccsp" + "github.com/hyperledger/fabric/common/channelconfig" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" ) type errRefreshEndpoint struct { @@ -83,3 +88,26 @@ type timeNumber struct { t time.Time n uint64 } + +func extractAddresses(channelID string, config *cb.Config, cryptoProvider bccsp.BCCSP) ([]string, map[string]orderers.OrdererOrg, error) { + bundle, err := channelconfig.NewBundle(channelID, config, cryptoProvider) + if err != nil { + return nil, nil, err + } + globalAddresses := bundle.ChannelConfig().OrdererAddresses() + orgAddresses := map[string]orderers.OrdererOrg{} + if ordererConfig, ok := bundle.OrdererConfig(); ok { + for orgName, org := range ordererConfig.Organizations() { + var certs [][]byte + certs = append(certs, org.MSP().GetTLSRootCerts()...) + certs = append(certs, org.MSP().GetTLSIntermediateCerts()...) + + orgAddresses[orgName] = orderers.OrdererOrg{ + Addresses: org.Endpoints(), + RootCerts: certs, + } + } + } + + return globalAddresses, orgAddresses, nil +} diff --git a/internal/pkg/peer/orderers/connection.go b/internal/pkg/peer/orderers/connection.go index bc129767ec3..ddb59cc4496 100644 --- a/internal/pkg/peer/orderers/connection.go +++ b/internal/pkg/peer/orderers/connection.go @@ -15,7 +15,6 @@ import ( "sync" "github.com/hyperledger/fabric/common/flogging" - "github.com/pkg/errors" ) @@ -99,7 +98,7 @@ func (cs *ConnectionSource) ShuffledEndpoints() []*Endpoint { func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]OrdererOrg) { cs.mutex.Lock() defer cs.mutex.Unlock() - cs.logger.Debug("Processing updates for orderer endpoints") + cs.logger.Infof("Processing updates for orderer endpoints: global: %v; orgs: %v", globalAddrs, orgs) newOrgToEndpointsHash := map[string][]byte{} diff --git a/internal/pkg/peer/orderers/connection_factory.go b/internal/pkg/peer/orderers/connection_factory.go new file mode 100644 index 00000000000..0405adf0150 --- /dev/null +++ b/internal/pkg/peer/orderers/connection_factory.go @@ -0,0 +1,29 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package orderers + +import ( + "github.com/hyperledger/fabric/common/flogging" +) + +type ConnectionSourcer interface { + RandomEndpoint() (*Endpoint, error) + ShuffledEndpoints() []*Endpoint + Update(globalAddrs []string, orgs map[string]OrdererOrg) +} + +type ConnectionSourceCreator interface { + CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer +} + +type ConnectionSourceFactory struct { + Overrides map[string]*Endpoint +} + +func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer { + return NewConnectionSource(logger, f.Overrides) +} diff --git a/internal/pkg/peer/orderers/connection_factory_test.go b/internal/pkg/peer/orderers/connection_factory_test.go new file mode 100644 index 00000000000..e362279a4ab --- /dev/null +++ b/internal/pkg/peer/orderers/connection_factory_test.go @@ -0,0 +1,36 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package orderers_test + +import ( + "testing" + + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + "github.com/stretchr/testify/require" +) + +func TestCreateConnectionSource(t *testing.T) { + factory := &orderers.ConnectionSourceFactory{} + require.NotNil(t, factory) + require.Nil(t, factory.Overrides) + lg := flogging.MustGetLogger("test") + connSource := factory.CreateConnectionSource(lg) + require.NotNil(t, connSource) + + overrides := make(map[string]*orderers.Endpoint) + overrides["127.0.0.1:1111"] = &orderers.Endpoint{ + Address: "127.0.0.1:2222", + RootCerts: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}}, + Refreshed: make(chan struct{}), + } + factory = &orderers.ConnectionSourceFactory{Overrides: overrides} + require.NotNil(t, factory) + require.Len(t, factory.Overrides, 1) + connSource = factory.CreateConnectionSource(lg) + require.NotNil(t, connSource) +} diff --git a/orderer/common/cluster/util.go b/orderer/common/cluster/util.go index e1ad2ab9798..f73e8ef56ac 100644 --- a/orderer/common/cluster/util.go +++ b/orderer/common/cluster/util.go @@ -24,15 +24,14 @@ import ( "sync/atomic" "time" - "github.com/golang/protobuf/ptypes/timestamp" - "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" "github.com/hyperledger/fabric-config/protolator" "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/orderer" "github.com/hyperledger/fabric/bccsp" "github.com/hyperledger/fabric/common/channelconfig" - "github.com/hyperledger/fabric/common/configtx" + "github.com/hyperledger/fabric/common/deliverclient" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/common/util" @@ -209,47 +208,6 @@ type Dialer interface { Dial(endpointCriteria EndpointCriteria) (*grpc.ClientConn, error) } -var errNotAConfig = errors.New("not a config block") - -// ConfigFromBlock returns a ConfigEnvelope if exists, or a *NotAConfigBlock error. -// It may also return some other error in case parsing failed. -func ConfigFromBlock(block *common.Block) (*common.ConfigEnvelope, error) { - if block == nil || block.Data == nil || len(block.Data.Data) == 0 { - return nil, errors.New("empty block") - } - txn := block.Data.Data[0] - env, err := protoutil.GetEnvelopeFromBlock(txn) - if err != nil { - return nil, errors.WithStack(err) - } - payload, err := protoutil.UnmarshalPayload(env.Payload) - if err != nil { - return nil, errors.WithStack(err) - } - if block.Header.Number == 0 { - configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) - if err != nil { - return nil, errors.Wrap(err, "invalid config envelope") - } - return configEnvelope, nil - } - if payload.Header == nil { - return nil, errors.New("nil header in payload") - } - chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader) - if err != nil { - return nil, errors.WithStack(err) - } - if common.HeaderType(chdr.Type) != common.HeaderType_CONFIG { - return nil, errNotAConfig - } - configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) - if err != nil { - return nil, errors.Wrap(err, "invalid config envelope") - } - return configEnvelope, nil -} - // VerifyBlockHash verifies the hash chain of the block with the given index // among the blocks of the given block buffer. func VerifyBlockHash(indexInBuffer int, blockBuff []*common.Block) error { @@ -792,9 +750,9 @@ func verifyBlockSequence(blockBuff []*common.Block, signatureVerifier protoutil. if err := VerifyBlockHash(i, blockBuff); err != nil { return err } - configFromBlock, err := ConfigFromBlock(block) + configFromBlock, err := deliverclient.ConfigFromBlock(block) - if err != nil && err != errNotAConfig { + if err != nil && err != deliverclient.ErrNotAConfig { return err } diff --git a/orderer/common/cluster/util_test.go b/orderer/common/cluster/util_test.go index 7f0f4a99bca..a93c52c0192 100644 --- a/orderer/common/cluster/util_test.go +++ b/orderer/common/cluster/util_test.go @@ -469,96 +469,6 @@ func TestEndpointconfigFromConfigBlockFailures(t *testing.T) { }) } -func TestConfigFromBlockBadInput(t *testing.T) { - for _, testCase := range []struct { - name string - block *common.Block - expectedError string - }{ - { - name: "nil block", - expectedError: "empty block", - block: nil, - }, - { - name: "nil block data", - expectedError: "empty block", - block: &common.Block{}, - }, - { - name: "no data in block", - expectedError: "empty block", - block: &common.Block{Data: &common.BlockData{}}, - }, - { - name: "invalid payload", - expectedError: "error unmarshalling Envelope", - block: &common.Block{Data: &common.BlockData{Data: [][]byte{{1, 2, 3}}}}, - }, - { - name: "bad genesis block", - expectedError: "invalid config envelope", - block: &common.Block{ - Header: &common.BlockHeader{}, Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{ - Payload: protoutil.MarshalOrPanic(&common.Payload{ - Data: []byte{1, 2, 3}, - }), - })}}, - }, - }, - { - name: "invalid envelope in block", - expectedError: "error unmarshalling Envelope", - block: &common.Block{Data: &common.BlockData{Data: [][]byte{{1, 2, 3}}}}, - }, - { - name: "invalid payload in block envelope", - expectedError: "error unmarshalling Payload", - block: &common.Block{Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{ - Payload: []byte{1, 2, 3}, - })}}}, - }, - { - name: "invalid channel header", - expectedError: "error unmarshalling ChannelHeader", - block: &common.Block{ - Header: &common.BlockHeader{Number: 1}, - Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{ - Payload: protoutil.MarshalOrPanic(&common.Payload{ - Header: &common.Header{ - ChannelHeader: []byte{1, 2, 3}, - }, - }), - })}}, - }, - }, - { - name: "invalid config block", - expectedError: "invalid config envelope", - block: &common.Block{ - Header: &common.BlockHeader{}, - Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{ - Payload: protoutil.MarshalOrPanic(&common.Payload{ - Data: []byte{1, 2, 3}, - Header: &common.Header{ - ChannelHeader: protoutil.MarshalOrPanic(&common.ChannelHeader{ - Type: int32(common.HeaderType_CONFIG), - }), - }, - }), - })}}, - }, - }, - } { - t.Run(testCase.name, func(t *testing.T) { - conf, err := cluster.ConfigFromBlock(testCase.block) - require.Nil(t, conf) - require.Error(t, err) - require.Contains(t, err.Error(), testCase.expectedError) - }) - } -} - func TestBlockValidationPolicyVerifier(t *testing.T) { config := genesisconfig.Load(genesisconfig.SampleInsecureSoloProfile, configtest.GetDevConfigDir()) group, err := encoder.NewChannelGroup(config) diff --git a/orderer/common/follower/block_puller.go b/orderer/common/follower/block_puller.go index 0fa40c35da3..9c1986ddec8 100644 --- a/orderer/common/follower/block_puller.go +++ b/orderer/common/follower/block_puller.go @@ -123,7 +123,7 @@ func (creator *BlockPullerCreator) BlockPuller(configBlock *common.Block, stopCh // UpdateVerifierFromConfigBlock creates a new block signature verifier from the config block and updates the internal // link to said verifier. func (creator *BlockPullerCreator) UpdateVerifierFromConfigBlock(configBlock *common.Block) error { - configEnv, err := cluster.ConfigFromBlock(configBlock) + configEnv, err := deliverclient.ConfigFromBlock(configBlock) if err != nil { return errors.WithMessage(err, "failed to extract config envelope from block") } @@ -154,7 +154,7 @@ func (creator *BlockPullerCreator) VerifyBlockSequence(blocks []*common.Block, _ // with our own. blocks[0] = creator.JoinBlock } - configEnv, err := cluster.ConfigFromBlock(blocks[0]) + configEnv, err := deliverclient.ConfigFromBlock(blocks[0]) if err != nil { return errors.WithMessage(err, "failed to extract config envelope from genesis block") } diff --git a/orderer/consensus/smartbft/util.go b/orderer/consensus/smartbft/util.go index fed5dbe258b..12158709676 100644 --- a/orderer/consensus/smartbft/util.go +++ b/orderer/consensus/smartbft/util.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/fabric/bccsp" "github.com/hyperledger/fabric/common/channelconfig" "github.com/hyperledger/fabric/common/crypto" + "github.com/hyperledger/fabric/common/deliverclient" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/orderer/common/cluster" "github.com/hyperledger/fabric/orderer/common/localconfig" @@ -52,7 +53,7 @@ type RuntimeConfig struct { // BlockCommitted updates the config from the block func (rtc RuntimeConfig) BlockCommitted(block *cb.Block, bccsp bccsp.BCCSP) (RuntimeConfig, error) { - if _, err := cluster.ConfigFromBlock(block); err == nil { + if _, err := deliverclient.ConfigFromBlock(block); err == nil { return rtc.configBlockCommitted(block, bccsp) } return RuntimeConfig{