Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ require (
github.com/c9s/goprocinfo v0.0.0-20190309065803-0b2ad9ac246b
github.com/cespare/cp v1.1.1 // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/elastic/gosigar v0.10.4 // indirect
github.com/ethereum/go-ethereum v1.9.6
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/google/go-cmp v0.3.1
github.com/huin/goupnp v1.0.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
Expand Down
12 changes: 12 additions & 0 deletions instrumentation/metric/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ func (g *Gauge) Update(i int64) {
atomic.StoreInt64(&g.value, i)
}

func (g *Gauge) UpdateMax(i int64) {
for { // retry indefinitely (?!) if optimistic lock fails
old := g.Value()
if old >= i {
return
}
if atomic.CompareAndSwapInt64(&g.value, old, i) { // optimistic lock
return
}
}
}

func (g *Gauge) UpdateUInt32(i int32) {
atomic.StoreInt64(&g.value, int64(i))
}
Expand Down
81 changes: 68 additions & 13 deletions instrumentation/metric/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,27 @@ import (
"time"
)

type Histogram struct {
type HistogramTimeDiff struct {
namedMetric
histo *hdrhistogram.WindowedHistogram
overflowCount int64
}

func newHistogram(name string, max int64, n int) *Histogram {
return &Histogram{
func newHistogramTimeDiff(name string, max int64, n int) *HistogramTimeDiff {
return &HistogramTimeDiff{
namedMetric: namedMetric{name: name},
histo: hdrhistogram.NewWindowed(n, 0, max, 1),
}
}

func (h *Histogram) RecordSince(t time.Time) {
func (h *HistogramTimeDiff) RecordSince(t time.Time) {
d := time.Since(t).Nanoseconds()
if err := h.histo.Current.RecordValue(int64(d)); err != nil {
atomic.AddInt64(&h.overflowCount, 1)
}
}

func (h *Histogram) Record(measurement int64) {
if err := h.histo.Current.RecordValue(measurement); err != nil {
atomic.AddInt64(&h.overflowCount, 1)
}
}

func (h *Histogram) String() string {
func (h *HistogramTimeDiff) String() string {
var errorRate float64
histo := h.histo.Current

Expand All @@ -63,7 +57,7 @@ func (h *Histogram) String() string {
errorRate)
}

func (h *Histogram) Export() exportedMetric {
func (h *HistogramTimeDiff) Export() exportedMetric {
histo := h.histo.Merge()

return &histogramExport{
Expand All @@ -78,6 +72,67 @@ func (h *Histogram) Export() exportedMetric {
}
}

func (h *Histogram) Rotate() {
func (h *HistogramTimeDiff) Rotate() {
h.histo.Rotate()
}

type HistogramInt64 struct { // TODO DRY HistogramTimeDiff is similar
namedMetric
histo *hdrhistogram.WindowedHistogram
overflowCount int64
}

func newHistogramInt64(name string, max int64, n int) *HistogramInt64 {
return &HistogramInt64{
namedMetric: namedMetric{name: name},
histo: hdrhistogram.NewWindowed(n, 0, max, 1),
}
}

func (h *HistogramInt64) Record(measurement int64) {
if err := h.histo.Current.RecordValue(measurement); err != nil {
atomic.AddInt64(&h.overflowCount, 1)
}
}

func (h *HistogramInt64) String() string {
var errorRate float64
histo := h.histo.Current

if h.overflowCount > 0 {
errorRate = float64(histo.TotalCount()) / float64(h.overflowCount)
} else {
errorRate = 0
}

return fmt.Sprintf(
"metric %s: [min=%d, p50=%d, p95=%d, p99=%d, max=%d, avg=%f, samples=%d, error rate=%f]\n",
h.name,
histo.Min(),
histo.ValueAtQuantile(50),
histo.ValueAtQuantile(95),
histo.ValueAtQuantile(99),
histo.Max(),
histo.Mean(),
histo.TotalCount(),
errorRate)
}

func (h *HistogramInt64) Export() exportedMetric {
histo := h.histo.Merge()

return &histogramExport{
h.name,
float64(histo.Min()),
float64(histo.ValueAtQuantile(50)),
float64(histo.ValueAtQuantile(95)),
float64(histo.ValueAtQuantile(99)),
float64(histo.Max()),
histo.Mean(),
histo.TotalCount(),
}
}

func (h *HistogramInt64) Rotate() {
h.histo.Rotate()
}
4 changes: 2 additions & 2 deletions instrumentation/metric/histogram_prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
"time"
)

// This does NOT test correctness of Histogram
// This does NOT test correctness of NewHistogramInt64
// (e.g. that calculation of quantiles for given values is correct)
// It only verifies the accurate conversion of metric values into Prometheus format.
func Test_PrometheusFormatterForHistogramWithLabels(t *testing.T) {
r := NewRegistry().WithVirtualChainId(100000)
const SEC = int64(time.Second)
histo := r.NewHistogram("Some.Latency", 1000*SEC)
histo := r.NewHistogramInt64("Some.Latency", 1000*SEC)

for i := 0; i < 1000; i++ {
histo.Record(int64(i) * SEC)
Expand Down
28 changes: 19 additions & 9 deletions instrumentation/metric/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ import (
const ROTATE_INTERVAL = 30 * time.Second
const AGGREGATION_SPAN = 10 * time.Minute

type rotator interface {
Rotate()
}

type Factory interface {
NewHistogram(name string, maxValue int64) *Histogram
NewLatency(name string, maxDuration time.Duration) *Histogram
NewHistogramTimeDiff(name string, maxValue int64) *HistogramTimeDiff
NewHistogramInt64(name string, maxValue int64) *HistogramInt64
NewLatency(name string, maxDuration time.Duration) *HistogramTimeDiff
NewGauge(name string) *Gauge
NewRate(name string) *Rate
NewText(name string, defaultValue ...string) *Text
Expand Down Expand Up @@ -123,14 +128,20 @@ func (r *inMemoryRegistry) NewGauge(name string) *Gauge {
return g
}

func (r *inMemoryRegistry) NewLatency(name string, maxDuration time.Duration) *Histogram {
h := newHistogram(name, maxDuration.Nanoseconds(), int(AGGREGATION_SPAN/ROTATE_INTERVAL))
func (r *inMemoryRegistry) NewLatency(name string, maxDuration time.Duration) *HistogramTimeDiff {
h := newHistogramTimeDiff(name, maxDuration.Nanoseconds(), int(AGGREGATION_SPAN/ROTATE_INTERVAL))
r.register(h)
return h
}

func (r *inMemoryRegistry) NewHistogramTimeDiff(name string, maxValue int64) *HistogramTimeDiff {
h := newHistogramTimeDiff(name, maxValue, int(AGGREGATION_SPAN/ROTATE_INTERVAL))
r.register(h)
return h
}

func (r *inMemoryRegistry) NewHistogram(name string, maxValue int64) *Histogram {
h := newHistogram(name, maxValue, int(AGGREGATION_SPAN/ROTATE_INTERVAL))
func (r *inMemoryRegistry) NewHistogramInt64(name string, maxValue int64) *HistogramInt64 {
h := newHistogramInt64(name, maxValue, int(AGGREGATION_SPAN/ROTATE_INTERVAL))
r.register(h)
return h
}
Expand Down Expand Up @@ -170,9 +181,8 @@ func (r *inMemoryRegistry) PeriodicallyRotate(ctx context.Context, logger log.Lo
r.mu.Lock()
defer r.mu.Unlock()
for _, m := range r.mu.metrics {
switch m.(type) {
case *Histogram: // only Histograms currently require rotating
m.(*Histogram).Rotate()
if rotator, ok := m.(rotator); ok { // TODO check how much faster it is to check for concrete types
rotator.Rotate()
}
}
}, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,26 @@ import (
)

type metrics struct {
sizeOnDisk *metric.Gauge
sizeOnDisk *metric.Gauge
readBlockSize *metric.HistogramInt64
writeBlockSize *metric.HistogramInt64
readRate *metric.Rate
writeRate *metric.Rate
readMaxBlockSize *metric.Gauge
writeMaxBlockSize *metric.Gauge
}

const blocksFilename = "blocks"

func newMetrics(m metric.Factory) *metrics {
return &metrics{
sizeOnDisk: m.NewGauge("BlockStorage.FileSystemSize.Bytes"),
sizeOnDisk: m.NewGauge("BlockStorage.FileSystemSize.Bytes"),
readBlockSize: m.NewHistogramInt64("BlockStorage.Block.Read.Size", 10*1024*1024), // rolling window analysis of block sizes up to 10MB
writeBlockSize: m.NewHistogramInt64("BlockStorage.Block.Write.Size", 10*1024*1024), // rolling window analysis of block sizes up to 10MB
readRate: m.NewRate("BlockStorage.Block.Read.Rate"),
writeRate: m.NewRate("BlockStorage.Block.Write.Rate"),
readMaxBlockSize: m.NewGauge("BlockStorage.Block.Read.Max.Bytes"),
writeMaxBlockSize: m.NewGauge("BlockStorage.Block.Write.Max.Bytes"),
}
}

Expand Down Expand Up @@ -227,7 +239,7 @@ func newFileBlockWriter(file *os.File, codec blockCodec, nextBlockOffset int64)

func buildIndex(r io.Reader, firstBlockOffset int64, logger log.Logger, c blockCodec) (*blockHeightIndex, error) {
bhIndex := newBlockHeightIndex(logger, firstBlockOffset)
offset := int64(firstBlockOffset)
offset := firstBlockOffset
for {
aBlock, blockSize, err := c.decode(r)
if err != nil {
Expand Down Expand Up @@ -269,6 +281,10 @@ func (f *BlockPersistence) WriteNextBlock(blockPair *protocol.BlockPairContainer
}

f.metrics.sizeOnDisk.Add(int64(n))
f.metrics.writeBlockSize.Record(int64(n))
f.metrics.writeMaxBlockSize.UpdateMax(int64(n))
f.metrics.writeRate.Measure(1)

return true, f.bhIndex.getLastBlockHeight(), nil
}

Expand All @@ -295,6 +311,7 @@ func (f *BlockPersistence) ScanBlocks(from primitives.BlockHeight, pageSize uint
}
page := make([]*protocol.BlockPairContainer, 0, pageSize)
// TODO: Gad allow update of sequence height inside page
// TODO extract this loop to a fetchBlocksFromFile method that will not seek every time
for height := fromHeight; height <= toHeight; height++ {
aBlock, err := f.fetchBlockFromFile(height, file)
if err != nil {
Expand All @@ -319,14 +336,22 @@ func (f *BlockPersistence) ScanBlocks(from primitives.BlockHeight, pageSize uint
func (f *BlockPersistence) fetchBlockFromFile(height primitives.BlockHeight, file *os.File) (*protocol.BlockPairContainer, error) {
initialOffset, ok := f.bhIndex.fetchBlockOffset(height)
if !ok {
return nil, fmt.Errorf("failed to find requested block %d", uint64(height))
return nil, fmt.Errorf("failed to find requested block %d", height)
}
newOffset, err := file.Seek(initialOffset, io.SeekStart)
if newOffset != initialOffset || err != nil {
return nil, errors.Wrapf(err, "failed to seek in blocks file to position %v", initialOffset)
}
aBlock, _, err := f.codec.decode(file)
return aBlock, err

aBlock, bytes, err := f.codec.decode(file)
if err != nil {
return nil, errors.Wrapf(err, "failed to decode block %d at position %v", height, initialOffset)
}

f.metrics.readBlockSize.Record(int64(bytes))
f.metrics.readRate.Measure(1)
f.metrics.readMaxBlockSize.UpdateMax(int64(bytes))
return aBlock, nil
}

func (f *BlockPersistence) GetLastBlockHeight() (primitives.BlockHeight, error) {
Expand Down
Loading