Skip to content

Commit

Permalink
FAB-17890 Ch.Part.API: allow registrar to list a single channel (hype…
Browse files Browse the repository at this point in the history
…rledger#1349)

* FAB-17890 Ch.Part.API: allow registrar to list a single channel

Implement the ChannelInfo(..) method in the registrar that looks up
a channel in the chains map and reports extended information on its
status. For this end we have to introduce the follower.Chain and
augment existing cluster type chains with means of reporting their
status.

- Introduce the skeleton a new type of consensus.Chain
  implementation: follower.Chain. This will be created and run when
  the orderer is required to follow the cluster and pull blocks from
  other orderers.

- The plan (for future commits) is for the follower.Chain to trigger
  the creation of an etcdraft.Chain when it discovers the orderer was
  added to the cluster, and vise versa; the etcdraft.Chain will replace
  itself with a follower.Chain when the orderer is removed from the
  cluster.

- Introduce a new interface that cluster-type chains implement, that
  allows them to report their relation to the cluster and their status.
  This is done because the registrar is not aware of the exact type of
  the chains it is keeping. The registrar cannot reflect on the type
  as well, as this will cause an import cycle (due to the etcdraft
  package importing multichannel).

Signed-off-by: Yoav Tock <[email protected]>
Change-Id: Ia454f47f04a8ba3dcd76886a5919d1c734c01015

* Review comments: add constants

Add typed constatnts for the possible values of
ChannelInfo.Status & ChannelInfo.ClusterRelation.

Signed-off-by: Yoav Tock <[email protected]>
Change-Id: I71062b944d4e5236c4df1462fa73740b68e2065b

* Review comments: improve documentation

Document types.ClusterRelation and types.Status.

Signed-off-by: Yoav Tock <[email protected]>
Change-Id: I8171e6cfe6c3a137e7f1f48a14d1b2fe7f1aea5a
  • Loading branch information
tock-ibm authored Jun 2, 2020
1 parent 17f5c7f commit 450ba3a
Show file tree
Hide file tree
Showing 15 changed files with 297 additions and 15 deletions.
11 changes: 11 additions & 0 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
Expand All @@ -34,6 +35,11 @@ type ChainSupport struct {
// that there is a single consensus type at this orderer node and therefore the resolution of
// the consensus type too happens only at the ChainSupport level.
consensus.MetadataValidator

// The registrar is not aware of the exact type that the Chain is, e.g. etcdraft, inactive, or follower.
// Therefore, we let each chain report its cluster relation and status through this interface. Non cluster
// type chains (solo, kafka) are assigned a static reporter.
consensus.StatusReporter
}

func newChainSupport(
Expand Down Expand Up @@ -88,6 +94,11 @@ func newChainSupport(
cs.MetadataValidator = consensus.NoOpMetadataValidator{}
}

cs.StatusReporter, ok = cs.Chain.(consensus.StatusReporter)
if !ok { // Non-cluster types: solo, kafka
cs.StatusReporter = consensus.StaticStatusReporter{ClusterRelation: types.ClusterRelationNone, Status: types.StatusActive}
}

logger.Debugf("[channel: %s] Done creating channel support resources", cs.ChannelID())

return cs
Expand Down
16 changes: 14 additions & 2 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,20 @@ func (r *Registrar) ChannelList() types.ChannelList {
}

func (r *Registrar) ChannelInfo(channelID string) (types.ChannelInfo, error) {
//TODO
return types.ChannelInfo{}, errors.New("Not implemented yet")
r.lock.RLock()
defer r.lock.RUnlock()

info := types.ChannelInfo{}
cs, ok := r.chains[channelID]
if !ok {
return info, types.ErrChannelNotExist
}

info.Name = channelID
info.Height = cs.Height()
info.ClusterRelation, info.Status = cs.StatusReport()

return info, nil
}

func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block) (types.ChannelInfo, error) {
Expand Down
32 changes: 28 additions & 4 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ func TestNewRegistrar(t *testing.T) {
}, "Should not panic when starting without a system channel")
require.NotNil(t, manager)
list := manager.ChannelList()
assert.Equal(t, types.ChannelList{SystemChannel: nil, Channels: nil}, list)
assert.Equal(t, types.ChannelList{}, list)
info, err := manager.ChannelInfo("my-channel")
assert.EqualError(t, err, types.ErrChannelNotExist.Error())
assert.Equal(t, types.ChannelInfo{}, info)
})

// This test checks to make sure that the orderer refuses to come up if there are multiple system channels
Expand Down Expand Up @@ -231,6 +234,13 @@ func TestNewRegistrar(t *testing.T) {
list,
)

info, err := manager.ChannelInfo("testchannelid")
assert.NoError(t, err)
assert.Equal(t,
types.ChannelInfo{Name: "testchannelid", URL: "", ClusterRelation: "none", Status: "active", Height: 1},
info,
)

testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, "testchannelid", chainSupport, rl, t)
})
}
Expand All @@ -251,7 +261,7 @@ func TestCreateChain(t *testing.T) {
lf, _ := newLedgerAndFactory(tmpdir, "testchannelid", genesisBlockSys)

consenters := make(map[string]consensus.Consenter)
consenters[confSys.Orderer.OrdererType] = &mockConsenter{}
consenters[confSys.Orderer.OrdererType] = &mockConsenter{cluster: true}

manager := NewRegistrar(localconfig.TopLevel{}, lf, mockCrypto(), &disabled.Provider{}, cryptoProvider)
manager.Initialize(consenters)
Expand All @@ -278,18 +288,32 @@ func TestCreateChain(t *testing.T) {
list,
)

info, err := manager.ChannelInfo("testchannelid")
assert.NoError(t, err)
assert.Equal(t,
types.ChannelInfo{Name: "testchannelid", URL: "", ClusterRelation: types.ClusterRelationMember, Status: types.StatusActive, Height: 1},
info,
)

info, err = manager.ChannelInfo("mychannel")
assert.NoError(t, err)
assert.Equal(t,
types.ChannelInfo{Name: "mychannel", URL: "", ClusterRelation: types.ClusterRelationMember, Status: types.StatusActive, Height: 1},
info,
)

// A subsequent creation, replaces the chain.
manager.CreateChain("mychannel")
chain2 := manager.GetChain("mychannel")
assert.NotNil(t, chain2)
// They are not the same
assert.NotEqual(t, chain, chain2)
// The old chain is halted
_, ok := <-chain.Chain.(*mockChain).queue
_, ok := <-chain.Chain.(*mockChainCluster).queue
assert.False(t, ok)

// The new chain is not halted: Close the channel to prove that.
close(chain2.Chain.(*mockChain).queue)
close(chain2.Chain.(*mockChainCluster).queue)
})

// This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain
Expand Down
22 changes: 20 additions & 2 deletions orderer/common/multichannel/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package multichannel

import (
"fmt"
"github.com/hyperledger/fabric/orderer/common/types"

cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/capabilities"
Expand All @@ -23,16 +24,33 @@ import (
)

type mockConsenter struct {
cluster bool
}

func (mc *mockConsenter) HandleChain(support consensus.ConsenterSupport, metadata *cb.Metadata) (consensus.Chain, error) {
return &mockChain{
chain := &mockChain{
queue: make(chan *cb.Envelope),
cutter: support.BlockCutter(),
support: support,
metadata: metadata,
done: make(chan struct{}),
}, nil
}

if mc.cluster {
clusterChain := &mockChainCluster{}
clusterChain.mockChain = chain
return clusterChain, nil
}

return chain, nil
}

type mockChainCluster struct {
*mockChain
}

func (c *mockChainCluster) StatusReport() (types.ClusterRelation, types.Status) {
return types.ClusterRelationMember, types.StatusActive
}

type mockChain struct {
Expand Down
44 changes: 40 additions & 4 deletions orderer/common/types/channelinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,53 @@ type ChannelInfoShort struct {
URL string `json:"url"`
}

// ClusterRelation represents the relationship between the orderer and the channel's consensus cluster.
type ClusterRelation string

const (
// The orderer is a cluster member of a cluster consensus protocol (e.g. etcdraft) for a specific channel.
// That is, the orderer is in the consenters set of the channel.
ClusterRelationMember ClusterRelation = "member"
// The orderer is following a cluster consensus protocol by pulling blocks from other orderers.
// The orderer is NOT in the consenters set of the channel.
ClusterRelationFollower ClusterRelation = "follower"
// The orderer is NOT in the consenters set of the channel, and is just tracking (polling) the last config block
// of the channel in order to detect when it is added to the channel.
ClusterRelationConfigTracker ClusterRelation = "config-tracker"
// The orderer runs a non-cluster consensus type, solo or kafka.
ClusterRelationNone ClusterRelation = "none"
)

// Status represents the degree by which the orderer had caught up with the rest of the cluster after joining the
// channel (either as a member or a follower).
type Status string

const (
// The orderer is active in the channel's consensus protocol, or following the cluster,
// with block height > the join-block number. (Height is last block number +1).
StatusActive Status = "active"
// The orderer is catching up with the cluster by pulling blocks from other orderers,
// with block height <= the join-block number.
StatusOnBoarding Status = "onboarding"
// The orderer is not storing any blocks for this channel.
StatusInactive Status = "inactive"
)

// ChannelInfo carries the response to an HTTP request to List a single channel.
// This is marshaled into the body of the HTTP response.
type ChannelInfo struct {
// The channel name.
Name string `json:"name"`
// The channel relative URL (no Host:Port, only path), e.g.: "/participation/v1/channels/my-channel".
URL string `json:"url"`
// Whether the orderer is a “member” or ”follower” of the cluster, for this channel. Case insensitive.
ClusterRelation string `json:"clusterRelation"`
// Whether the orderer is ”onboarding” or ”active”, for this channel. Case insensitive.
Status string `json:"status"`
// Whether the orderer is a “member” or ”follower” of the cluster, or "config-tracker" of the cluster, for this channel.
// For non cluster consensus types (solo, kafka) it is "none".
// Possible values: “member”, ”follower”, "config-tracker", "none".
ClusterRelation ClusterRelation `json:"clusterRelation"`
// Whether the orderer is ”onboarding”, ”active”, or "inactive", for this channel.
// For non cluster consensus types (solo, kafka) it is "active".
// Possible values: “onboarding”, ”active”, "inactive".
Status Status `json:"status"`
// Current block height.
Height uint64 `json:"height"`
}
4 changes: 2 additions & 2 deletions orderer/common/types/channelinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestChannelInfo(t *testing.T) {
info := types.ChannelInfo{
Name: "a",
URL: "/api/channels/a",
ClusterRelation: "follower",
Status: "active",
ClusterRelation: types.ClusterRelationFollower,
Status: types.StatusActive,
Height: uint64(1) << 60,
}

Expand Down
2 changes: 1 addition & 1 deletion orderer/common/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ var ErrChannelAlreadyExists = errors.New("channel already exists")
// already exist.
var ErrAppChannelsAlreadyExists = errors.New("application channels already exist")

// This error is returned when trying to remove a channel that does not exist
// This error is returned when trying to remove or list a channel that does not exist
var ErrChannelNotExist = errors.New("channel does not exist")
32 changes: 32 additions & 0 deletions orderer/consensus/cluster_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package consensus

import "github.com/hyperledger/fabric/orderer/common/types"

// StatusReporter is implemented by cluster-type Chain implementations.
// It allows the node to report its cluster relation and its status within that relation.
// This information is used to generate the channelparticipation.ChannelInfo in response
// to a "List" request on a particular channel.
//
// Not all chains must implement this, in particular non-cluster-type (solo, kafka) are
// assigned a StaticStatusReporter at construction time.
type StatusReporter interface {
// StatusReport provides the cluster relation and status.
// See: channelparticipation.ChannelInfo for more details.
StatusReport() (types.ClusterRelation, types.Status)
}

// StaticStatusReporter is intended for chains that do not implement the StatusReporter interface.
type StaticStatusReporter struct {
ClusterRelation types.ClusterRelation
Status types.Status
}

func (s StaticStatusReporter) StatusReport() (types.ClusterRelation, types.Status) {
return s.ClusterRelation, s.Status
}
27 changes: 27 additions & 0 deletions orderer/consensus/cluster_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package consensus_test

import (
"github.com/hyperledger/fabric/orderer/common/types"
"testing"

"github.com/hyperledger/fabric/orderer/consensus"
"github.com/stretchr/testify/assert"
)

func TestStaticStatusReporter(t *testing.T) {
staticSR := &consensus.StaticStatusReporter{
ClusterRelation: types.ClusterRelationNone,
Status: types.StatusActive,
}

var sr consensus.StatusReporter = staticSR // make sure it implements this interface
cRel, status := sr.StatusReport()
assert.Equal(t, types.ClusterRelationNone, cRel)
assert.Equal(t, types.StatusActive, status)
}
6 changes: 6 additions & 0 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"encoding/pem"
"fmt"
"github.com/hyperledger/fabric/orderer/common/types"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1315,6 +1316,11 @@ func (c *Chain) ValidateConsensusMetadata(oldMetadataBytes, newMetadataBytes []b
return nil
}

// StatusReport returns the ClusterRelation & Status
func (c *Chain) StatusReport() (types.ClusterRelation, types.Status) {
return types.ClusterRelationMember, types.StatusActive
}

func (c *Chain) suspectEviction() bool {
if c.isRunning() != nil {
return false
Expand Down
4 changes: 4 additions & 0 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
orderer_types "github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks"
consensusmocks "github.com/hyperledger/fabric/orderer/consensus/mocks"
Expand Down Expand Up @@ -202,6 +203,9 @@ var _ = Describe("Chain", func() {
Expect(err).NotTo(HaveOccurred())

chain.Start()
cRel, status := chain.StatusReport()
Expect(cRel).To(Equal(orderer_types.ClusterRelationMember))
Expect(status).To(Equal(orderer_types.StatusActive))

// When the Raft node bootstraps, it produces a ConfChange
// to add itself, which needs to be consumed with Ready().
Expand Down
Loading

0 comments on commit 450ba3a

Please sign in to comment.