Skip to content

Commit

Permalink
Merge "[FAB-15031] Collect common ledger stats in blockstorage"
Browse files Browse the repository at this point in the history
  • Loading branch information
manish-sethi authored and Gerrit Code Review committed Jun 12, 2019
2 parents 2fdb76f + 406c80c commit f2cc88c
Show file tree
Hide file tree
Showing 22 changed files with 376 additions and 156 deletions.
5 changes: 3 additions & 2 deletions common/ledger/blkstorage/fsblkstorage/blockindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
Expand Down Expand Up @@ -123,7 +124,7 @@ func testBlockIndexSelectiveIndexingWrongConfig(t *testing.T, indexItems []blkst
testName = testName + string(s)
}
t.Run(testName, func(t *testing.T) {
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), indexItems)
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), indexItems, &disabled.Provider{})
defer env.Cleanup()

assert.Panics(t, func() {
Expand All @@ -150,7 +151,7 @@ func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.Index
testName = testName + string(s)
}
t.Run(testName, func(t *testing.T) {
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), indexItems)
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), indexItems, &disabled.Provider{})
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testledger")
defer blkfileMgrWrapper.close()
Expand Down
28 changes: 25 additions & 3 deletions common/ledger/blkstorage/fsblkstorage/fs_blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package fsblkstorage

import (
"time"

"github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
Expand All @@ -29,17 +31,32 @@ type fsBlockStore struct {
id string
conf *Conf
fileMgr *blockfileMgr
stats *ledgerStats
}

// NewFsBlockStore constructs a `FsBlockStore`
func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
dbHandle *leveldbhelper.DBHandle) *fsBlockStore {
return &fsBlockStore{id, conf, newBlockfileMgr(id, conf, indexConfig, dbHandle)}
dbHandle *leveldbhelper.DBHandle, stats *stats) *fsBlockStore {
fileMgr := newBlockfileMgr(id, conf, indexConfig, dbHandle)

// create ledgerStats and initialize blockchain_height stat
ledgerStats := stats.ledgerStats(id)
info := fileMgr.getBlockchainInfo()
ledgerStats.updateBlockchainHeight(info.Height)

return &fsBlockStore{id, conf, fileMgr, ledgerStats}
}

// AddBlock adds a new block
func (store *fsBlockStore) AddBlock(block *common.Block) error {
return store.fileMgr.addBlock(block)
// track elapsed time to collect block commit time
startBlockCommit := time.Now()
result := store.fileMgr.addBlock(block)
elapsedBlockCommit := time.Since(startBlockCommit)

store.updateBlockStats(block.Header.Number, elapsedBlockCommit)

return result
}

// GetBlockchainInfo returns the current info about blockchain
Expand Down Expand Up @@ -85,3 +102,8 @@ func (store *fsBlockStore) Shutdown() {
logger.Debugf("closing fs blockStore:%s", store.id)
store.fileMgr.close()
}

func (store *fsBlockStore) updateBlockStats(blockNum uint64, blockstorageCommitTime time.Duration) {
store.stats.updateBlockchainHeight(blockNum + 1)
store.stats.updateBlockstorageCommitTime(blockstorageCommitTime)
}
10 changes: 7 additions & 3 deletions common/ledger/blkstorage/fsblkstorage/fs_blockstore_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@ import (
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/common/metrics"
)

// FsBlockstoreProvider provides handle to block storage - this is not thread-safe
type FsBlockstoreProvider struct {
conf *Conf
indexConfig *blkstorage.IndexConfig
leveldbProvider *leveldbhelper.Provider
stats *stats
}

// NewProvider constructs a filesystem based block store provider
func NewProvider(conf *Conf, indexConfig *blkstorage.IndexConfig) blkstorage.BlockStoreProvider {
func NewProvider(conf *Conf, indexConfig *blkstorage.IndexConfig, metricsProvider metrics.Provider) blkstorage.BlockStoreProvider {
p := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: conf.getIndexDir()})
return &FsBlockstoreProvider{conf, indexConfig, p}
// create stats instance at provider level and pass to newFsBlockStore
stats := newStats(metricsProvider)
return &FsBlockstoreProvider{conf, indexConfig, p, stats}
}

// CreateBlockStore simply calls OpenBlockStore
Expand All @@ -45,7 +49,7 @@ func (p *FsBlockstoreProvider) CreateBlockStore(ledgerid string) (blkstorage.Blo
// This method should be invoked only once for a particular ledgerid
func (p *FsBlockstoreProvider) OpenBlockStore(ledgerid string) (blkstorage.BlockStore, error) {
indexStoreHandle := p.leveldbProvider.GetDBHandle(ledgerid)
return newFsBlockStore(ledgerid, p.conf, p.indexConfig, indexStoreHandle), nil
return newFsBlockStore(ledgerid, p.conf, p.indexConfig, indexStoreHandle, p.stats), nil
}

// Exists tells whether the BlockStore with given id exists
Expand Down
67 changes: 67 additions & 0 deletions common/ledger/blkstorage/fsblkstorage/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package fsblkstorage

import (
"time"

"github.com/hyperledger/fabric/common/metrics"
)

type stats struct {
blockchainHeight metrics.Gauge
blockstorageCommitTime metrics.Histogram
}

func newStats(metricsProvider metrics.Provider) *stats {
stats := &stats{}
stats.blockchainHeight = metricsProvider.NewGauge(blockchainHeightOpts)
stats.blockstorageCommitTime = metricsProvider.NewHistogram(blockstorageCommitTimeOpts)
return stats
}

// ledgerStats defines block metrics that are common for both orderer and peer
type ledgerStats struct {
stats *stats
ledgerid string
}

func (s *stats) ledgerStats(ledgerid string) *ledgerStats {
return &ledgerStats{
s, ledgerid,
}
}

func (s *ledgerStats) updateBlockchainHeight(height uint64) {
// casting uint64 to float64 guarantees precision for the numbers upto 9,007,199,254,740,992 (1<<53)
// since, we are not expecting the blockchains of this scale anytime soon, we go ahead with this for now.
s.stats.blockchainHeight.With("channel", s.ledgerid).Set(float64(height))
}

func (s *ledgerStats) updateBlockstorageCommitTime(timeTaken time.Duration) {
s.stats.blockstorageCommitTime.With("channel", s.ledgerid).Observe(timeTaken.Seconds())
}

var (
blockchainHeightOpts = metrics.GaugeOpts{
Namespace: "ledger",
Subsystem: "",
Name: "blockchain_height",
Help: "Height of the chain in blocks.",
LabelNames: []string{"channel"},
StatsdFormat: "%{#fqname}.%{channel}",
}

blockstorageCommitTimeOpts = metrics.HistogramOpts{
Namespace: "ledger",
Subsystem: "",
Name: "blockstorage_commit_time",
Help: "Time taken in seconds for committing the block to storage.",
LabelNames: []string{"channel"},
StatsdFormat: "%{#fqname}.%{channel}",
Buckets: []float64{0.005, 0.01, 0.015, 0.05, 0.1, 1, 10},
}
)
166 changes: 166 additions & 0 deletions common/ledger/blkstorage/fsblkstorage/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package fsblkstorage

import (
"testing"
"time"

"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/metrics/metricsfakes"
"github.com/hyperledger/fabric/common/util"
"github.com/stretchr/testify/assert"
)

func TestStatsBlockchainHeight(t *testing.T) {
testMetricProvider := testutilConstructMetricProvider()
env := newTestEnvWithMetricsProvider(t, NewConf(testPath(), 0), testMetricProvider.fakeProvider)
defer env.Cleanup()

provider := env.provider
ledgerid := "ledger-stats"
store, err := provider.OpenBlockStore(ledgerid)
assert.NoError(t, err)
defer store.Shutdown()

// add genesis block
blockGenerator, genesisBlock := testutil.NewBlockGenerator(t, util.GetTestChainID(), false)
err = store.AddBlock(genesisBlock)
assert.NoError(t, err)

// add one more block
b1 := blockGenerator.NextBlock([][]byte{})
err = store.AddBlock(b1)
assert.NoError(t, err)

// should have 3 calls for fakeBlockchainHeightGauge: OpenBlockStore, genesis block, and block b1
fakeBlockchainHeightGauge := testMetricProvider.fakeBlockchainHeightGauge
expectedCallCount := 3
assert.Equal(t, expectedCallCount, fakeBlockchainHeightGauge.SetCallCount())

// verify the call for OpenBlockStore
assert.Equal(t, float64(0), fakeBlockchainHeightGauge.SetArgsForCall(0))
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(0))

// verify the call for adding genesis block
assert.Equal(t, float64(1), fakeBlockchainHeightGauge.SetArgsForCall(1))
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(1))

// verify the call for adding block b1
assert.Equal(t, float64(2), fakeBlockchainHeightGauge.SetArgsForCall(2))
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(2))

// shutdown and reopen the store to verify blockchain height
store.Shutdown()
store, err = provider.OpenBlockStore(ledgerid)
assert.NoError(t, err)

// verify the call when opening an existing ledger - should set height correctly
assert.Equal(t, float64(2), fakeBlockchainHeightGauge.SetArgsForCall(3))
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(3))

// invoke updateBlockStats api explicitly and verify the call with fake metrics
store.(*fsBlockStore).updateBlockStats(10, 1*time.Second)
assert.Equal(t, float64(11), fakeBlockchainHeightGauge.SetArgsForCall(4))
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(4))
}

func TestStatsBlockCommit(t *testing.T) {
testMetricProvider := testutilConstructMetricProvider()
env := newTestEnvWithMetricsProvider(t, NewConf(testPath(), 0), testMetricProvider.fakeProvider)
defer env.Cleanup()

provider := env.provider
ledgerid := "ledger-stats"
store, err := provider.OpenBlockStore(ledgerid)
assert.NoError(t, err)
defer store.Shutdown()

// add a genesis block
blockGenerator, genesisBlock := testutil.NewBlockGenerator(t, util.GetTestChainID(), false)
err = store.AddBlock(genesisBlock)
assert.NoError(t, err)

// add 3 more blocks
for i := 1; i <= 3; i++ {
b := blockGenerator.NextBlock([][]byte{})
err = store.AddBlock(b)
assert.NoError(t, err)
}

fakeBlockstorageCommitTimeHist := testMetricProvider.fakeBlockstorageCommitTimeHist

// should have 4 calls to fakeBlockstorageCommitTimeHist: genesis block, and 3 blocks
expectedCallCount := 1 + 3
assert.Equal(t, expectedCallCount, fakeBlockstorageCommitTimeHist.ObserveCallCount())

// verify the value of channel in each call (0, 1, 2, 3)
for i := 0; i < expectedCallCount; i++ {
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockstorageCommitTimeHist.WithArgsForCall(i))
}

// invoke updateBlockStats api explicitly and verify with fake metrics (call number is 4)
store.(*fsBlockStore).updateBlockStats(4, 10*time.Second)
assert.Equal(t,
[]string{"channel", ledgerid},
testMetricProvider.fakeBlockstorageCommitTimeHist.WithArgsForCall(4),
)
assert.Equal(t,
float64(10),
testMetricProvider.fakeBlockstorageCommitTimeHist.ObserveArgsForCall(4),
)
}

type testMetricProvider struct {
fakeProvider *metricsfakes.Provider
fakeBlockchainHeightGauge *metricsfakes.Gauge
fakeBlockstorageCommitTimeHist *metricsfakes.Histogram
}

func testutilConstructMetricProvider() *testMetricProvider {
fakeProvider := &metricsfakes.Provider{}
fakeBlockchainHeightGauge := testutilConstructGauge()
fakeBlockstorageCommitTimeHist := testutilConstructHist()
fakeProvider.NewGaugeStub = func(opts metrics.GaugeOpts) metrics.Gauge {
switch opts.Name {
case blockchainHeightOpts.Name:
return fakeBlockchainHeightGauge
default:
return nil
}
}
fakeProvider.NewHistogramStub = func(opts metrics.HistogramOpts) metrics.Histogram {
switch opts.Name {
case blockstorageCommitTimeOpts.Name:
return fakeBlockstorageCommitTimeHist
default:
return nil
}
}

return &testMetricProvider{
fakeProvider,
fakeBlockchainHeightGauge,
fakeBlockstorageCommitTimeHist,
}
}

func testutilConstructGauge() *metricsfakes.Gauge {
fakeGauge := &metricsfakes.Gauge{}
fakeGauge.WithStub = func(lableValues ...string) metrics.Gauge {
return fakeGauge
}
return fakeGauge
}

func testutilConstructHist() *metricsfakes.Histogram {
fakeHist := &metricsfakes.Histogram{}
fakeHist.WithStub = func(lableValues ...string) metrics.Histogram {
return fakeHist
}
return fakeHist
}
12 changes: 9 additions & 3 deletions common/ledger/blkstorage/fsblkstorage/pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/assert"
Expand All @@ -48,6 +50,10 @@ type testEnv struct {
}

func newTestEnv(t testing.TB, conf *Conf) *testEnv {
return newTestEnvWithMetricsProvider(t, conf, &disabled.Provider{})
}

func newTestEnvWithMetricsProvider(t testing.TB, conf *Conf, metricsProvider metrics.Provider) *testEnv {
attrsToIndex := []blkstorage.IndexableAttr{
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
Expand All @@ -56,12 +62,12 @@ func newTestEnv(t testing.TB, conf *Conf) *testEnv {
blkstorage.IndexableAttrBlockTxID,
blkstorage.IndexableAttrTxValidationCode,
}
return newTestEnvSelectiveIndexing(t, conf, attrsToIndex)
return newTestEnvSelectiveIndexing(t, conf, attrsToIndex, metricsProvider)
}

func newTestEnvSelectiveIndexing(t testing.TB, conf *Conf, attrsToIndex []blkstorage.IndexableAttr) *testEnv {
func newTestEnvSelectiveIndexing(t testing.TB, conf *Conf, attrsToIndex []blkstorage.IndexableAttr, metricsProvider metrics.Provider) *testEnv {
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
return &testEnv{t, NewProvider(conf, indexConfig).(*FsBlockstoreProvider)}
return &testEnv{t, NewProvider(conf, indexConfig, metricsProvider).(*FsBlockstoreProvider)}
}

func (env *testEnv) Cleanup() {
Expand Down
Loading

0 comments on commit f2cc88c

Please sign in to comment.