Skip to content

Commit dbef5de

Browse files
authored
feat(libp2p): expose libp2p bandwidth metrics (#12402)
This exposes bandwidth metrics via async callback to avoid allocating/reporting metrics on any hot-paths. I'm using open telemetry as we've already setup a bridge for F3 and opencensus is deprecated in favor of open telemetry (so we're going to slowly move over anyways).
1 parent 0225c91 commit dbef5de

File tree

7 files changed

+85
-26
lines changed

7 files changed

+85
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
- Some APIs have changed which may impact users consuming Lotus Gateway code as a library.
4848
- The default value for the `Events.FilterTTL` config option has been reduced from 24h to 1h. This means that filters will expire on a Lotus node after 1 hour of not being accessed by the client.
4949
- feat(f3): F3 has been updated with many performance improvements and additional metrics.
50+
- feat(libp2p): expose libp2p bandwidth metrics via Prometheus.
5051

5152
# 1.28.2 / 2024-08-15
5253

chain/lf3/f3.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ import (
1010
logging "github.com/ipfs/go-log/v2"
1111
pubsub "github.com/libp2p/go-libp2p-pubsub"
1212
"github.com/libp2p/go-libp2p/core/host"
13-
"go.opentelemetry.io/otel"
14-
"go.opentelemetry.io/otel/exporters/prometheus"
15-
"go.opentelemetry.io/otel/sdk/metric"
1613
"go.uber.org/fx"
1714
"golang.org/x/xerrors"
1815

@@ -53,20 +50,6 @@ type F3Params struct {
5350

5451
var log = logging.Logger("f3")
5552

56-
func init() {
57-
// Set up otel to prometheus reporting so that F3 metrics are reported via lotus
58-
// prometheus metrics. This bridge eventually gets picked up by opencensus
59-
// exporter as HTTP handler. This by default registers an otel collector against
60-
// the global prometheus registry. In the future, we should clean up metrics in
61-
// Lotus and move it all to use otel. For now, bridge away.
62-
if bridge, err := prometheus.New(); err != nil {
63-
log.Errorf("could not create the otel prometheus exporter: %v", err)
64-
} else {
65-
provider := metric.NewMeterProvider(metric.WithReader(bridge))
66-
otel.SetMeterProvider(provider)
67-
}
68-
}
69-
7053
func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) {
7154

7255
ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3"))

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ require (
147147
go.opentelemetry.io/otel/bridge/opencensus v1.28.0
148148
go.opentelemetry.io/otel/exporters/jaeger v1.14.0
149149
go.opentelemetry.io/otel/exporters/prometheus v0.50.0
150+
go.opentelemetry.io/otel/metric v1.28.0
150151
go.opentelemetry.io/otel/sdk v1.28.0
151152
go.opentelemetry.io/otel/sdk/metric v1.28.0
152153
go.uber.org/atomic v1.11.0
@@ -323,7 +324,6 @@ require (
323324
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
324325
github.com/zondax/hid v0.9.2 // indirect
325326
github.com/zondax/ledger-go v0.14.3 // indirect
326-
go.opentelemetry.io/otel/metric v1.28.0 // indirect
327327
go.opentelemetry.io/otel/trace v1.28.0 // indirect
328328
go.uber.org/dig v1.17.1 // indirect
329329
go.uber.org/mock v0.4.0 // indirect

metrics/otel_bridge.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package metrics
2+
3+
import (
4+
"go.opentelemetry.io/otel"
5+
"go.opentelemetry.io/otel/exporters/prometheus"
6+
"go.opentelemetry.io/otel/sdk/metric"
7+
)
8+
9+
func init() {
10+
// Set up otel to prometheus reporting so that F3 metrics are reported via lotus
11+
// prometheus metrics. This bridge eventually gets picked up by opencensus
12+
// exporter as HTTP handler. This by default registers an otel collector against
13+
// the global prometheus registry. In the future, we should clean up metrics in
14+
// Lotus and move it all to use otel. For now, bridge away.
15+
if bridge, err := prometheus.New(); err != nil {
16+
log.Errorf("could not create the otel prometheus exporter: %v", err)
17+
} else {
18+
provider := metric.NewMeterProvider(metric.WithReader(bridge))
19+
otel.SetMeterProvider(provider)
20+
}
21+
}

node/modules/lp2p/host.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,6 @@ func Host(mctx helpers.MetricsCtx, buildVersion build.BuildVersion, lc fx.Lifecy
6262
return nil, err
6363
}
6464

65-
lc.Append(fx.Hook{
66-
OnStop: func(ctx context.Context) error {
67-
return h.Close()
68-
},
69-
})
70-
7165
return h, nil
7266
}
7367

node/modules/lp2p/metrics.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package lp2p
2+
3+
import (
4+
"go.opentelemetry.io/otel"
5+
"go.opentelemetry.io/otel/attribute"
6+
"go.opentelemetry.io/otel/metric"
7+
)
8+
9+
var otelmeter = otel.Meter("libp2p")
10+
11+
var attrIdentity = attribute.Key("identity")
12+
var attrProtocolID = attribute.Key("protocol")
13+
var attrDirectionInbound = attribute.String("direction", "inbound")
14+
var attrDirectionOutbound = attribute.String("direction", "outbound")
15+
16+
var otelmetrics = struct {
17+
bandwidth metric.Int64ObservableCounter
18+
}{
19+
bandwidth: must(otelmeter.Int64ObservableCounter("lotus_libp2p_bandwidth",
20+
metric.WithDescription("Libp2p stream bandwidth."),
21+
metric.WithUnit("By"),
22+
)),
23+
}
24+
25+
func must[T any](v T, err error) T {
26+
if err != nil {
27+
panic(err)
28+
}
29+
return v
30+
}

node/modules/lp2p/transport.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package lp2p
22

33
import (
4+
"context"
5+
46
"github.com/libp2p/go-libp2p"
57
"github.com/libp2p/go-libp2p/core/metrics"
8+
"github.com/libp2p/go-libp2p/core/peer"
69
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
710
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
811
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
12+
"go.opentelemetry.io/otel/metric"
13+
"go.uber.org/fx"
914
)
1015

1116
var DefaultTransports = simpleOpt(libp2p.DefaultTransports)
@@ -31,8 +36,33 @@ func Security(enabled, preferTLS bool) interface{} {
3136
}
3237
}
3338

34-
func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) {
39+
func BandwidthCounter(lc fx.Lifecycle, id peer.ID) (opts Libp2pOpts, reporter metrics.Reporter, err error) {
3540
reporter = metrics.NewBandwidthCounter()
3641
opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter))
37-
return opts, reporter
42+
43+
// Register it with open telemetry. We report by-callback instead of implementing a custom
44+
// bandwidth counter to avoid allocating every time we read/write to a stream (and to stay
45+
// out of the hot path).
46+
//
47+
// Identity is required to ensure this observer observes with unique attributes.
48+
identityAttr := attrIdentity.String(id.String())
49+
registration, err := otelmeter.RegisterCallback(func(ctx context.Context, obs metric.Observer) error {
50+
for p, bw := range reporter.GetBandwidthByProtocol() {
51+
if p == "" {
52+
p = "<unknown>"
53+
}
54+
protoAttr := attrProtocolID.String(string(p))
55+
obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalOut,
56+
metric.WithAttributes(identityAttr, protoAttr, attrDirectionOutbound))
57+
obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalIn,
58+
metric.WithAttributes(identityAttr, protoAttr, attrDirectionInbound))
59+
}
60+
return nil
61+
}, otelmetrics.bandwidth)
62+
if err != nil {
63+
return Libp2pOpts{}, nil, err
64+
}
65+
lc.Append(fx.StopHook(registration.Unregister))
66+
67+
return opts, reporter, nil
3868
}

0 commit comments

Comments
 (0)