Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ import (
_ "github.com/grafana/alloy/internal/component/pyroscope/receive_http" // Import pyroscope.receive_http
_ "github.com/grafana/alloy/internal/component/pyroscope/relabel" // Import pyroscope.relabel
_ "github.com/grafana/alloy/internal/component/pyroscope/scrape" // Import pyroscope.scrape
_ "github.com/grafana/alloy/internal/component/pyroscope/write" // Import pyroscope.write
_ "github.com/grafana/alloy/internal/component/pyroscope/write/glue" // Import pyroscope.write
_ "github.com/grafana/alloy/internal/component/remote/http" // Import remote.http
_ "github.com/grafana/alloy/internal/component/remote/kubernetes/configmap" // Import remote.kubernetes.configmap
_ "github.com/grafana/alloy/internal/component/remote/kubernetes/secret" // Import remote.kubernetes.secret
Expand Down
27 changes: 27 additions & 0 deletions internal/component/pyroscope/util/glue/glue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package glue

import (
"context"

"github.com/grafana/alloy/internal/component"
)

type GenericComponent[ARGS any] interface {
Run(ctx context.Context) error
Update(args ARGS) error
}

// GenericComponentGlue is a helper to allow writing alloy components without depending on component.Arguments package
// as a tiny nice bonus, the Update method is type-safe and does not require casting interface{} to ARGS
type GenericComponentGlue[ARGS any] struct {
Impl GenericComponent[ARGS]
}

func (c *GenericComponentGlue[ARGS]) Run(ctx context.Context) error {
return c.Impl.Run(ctx)
}

func (c *GenericComponentGlue[ARGS]) Update(args component.Arguments) error {
ga := args.(ARGS)
return c.Impl.Update(ga)
}
18 changes: 18 additions & 0 deletions internal/component/pyroscope/util/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package metrics

import "github.com/prometheus/client_golang/prometheus"

// MustRegisterOrGet is a copy of util.MustRegisterOrGet but does not bring hundreds of transitive dependencies
// A little copying is better than a little dependency.
// https://www.youtube.com/watch?v=PAAkCSZUG1c&t=568s
// The Previous attempt to fix this globally stalled: https://github.com/grafana/alloy/pull/4369
// So for now it is in the pyroscope subpackage
func MustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {
if err := reg.Register(c); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
return are.ExistingCollector
}
panic(err)
}
return c
}
23 changes: 23 additions & 0 deletions internal/component/pyroscope/util/testlog/test_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package testlog

import (
"os"
"testing"

"github.com/go-kit/log"
)

// TestLogger is a copy of util.TestLogger but does not bring hundreds of transitive dependencies
// A little copying is better than a little dependency.
// https://www.youtube.com/watch?v=PAAkCSZUG1c&t=568s
// The Previous attempt to fix this globally stalled: https://github.com/grafana/alloy/pull/4369
// So for now it is in the pyroscope subpackage
func TestLogger(t testing.TB) log.Logger {
t.Helper()
l := log.NewSyncLogger(log.NewLogfmtLogger(os.Stderr))
l = log.WithPrefix(l,
"test", t.Name(),
"ts", log.DefaultTimestampUTC,
)
return l
}
41 changes: 41 additions & 0 deletions internal/component/pyroscope/write/glue/glue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package glue

import (
"github.com/grafana/alloy/internal/alloyseed"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/pyroscope/util/glue"
"github.com/grafana/alloy/internal/component/pyroscope/write"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/useragent"
)

func init() {
component.Register(component.Registration{
Name: "pyroscope.write",
Stability: featuregate.StabilityGenerallyAvailable,
Args: write.Arguments{},
Exports: write.Exports{},
Build: func(o component.Options, c component.Arguments) (component.Component, error) {
tracer := o.Tracer.Tracer("pyroscope.write")
args := c.(write.Arguments)
userAgent := useragent.Get()
uid := alloyseed.Get().UID

gc, err := write.New(
o.Logger,
tracer,
o.Registerer,
func(exports write.Exports) {
o.OnStateChange(exports)
},
userAgent,
uid,
args,
)
if err != nil {
return nil, err
}
return &glue.GenericComponentGlue[write.Arguments]{Impl: gc}, nil
},
})
}
14 changes: 7 additions & 7 deletions internal/component/pyroscope/write/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package write

import (
"github.com/grafana/alloy/internal/util"
pyrometricsutil "github.com/grafana/alloy/internal/component/pyroscope/util/metrics"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -43,12 +43,12 @@ func newMetrics(reg prometheus.Registerer) *metrics {
}

if reg != nil {
m.sentBytes = util.MustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)
m.droppedBytes = util.MustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
m.sentProfiles = util.MustRegisterOrGet(reg, m.sentProfiles).(*prometheus.CounterVec)
m.droppedProfiles = util.MustRegisterOrGet(reg, m.droppedProfiles).(*prometheus.CounterVec)
m.retries = util.MustRegisterOrGet(reg, m.retries).(*prometheus.CounterVec)
m.latency = util.MustRegisterOrGet(reg, m.latency).(*prometheus.HistogramVec)
m.sentBytes = pyrometricsutil.MustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)
m.droppedBytes = pyrometricsutil.MustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
m.sentProfiles = pyrometricsutil.MustRegisterOrGet(reg, m.sentProfiles).(*prometheus.CounterVec)
m.droppedProfiles = pyrometricsutil.MustRegisterOrGet(reg, m.droppedProfiles).(*prometheus.CounterVec)
m.retries = pyrometricsutil.MustRegisterOrGet(reg, m.retries).(*prometheus.CounterVec)
m.latency = pyrometricsutil.MustRegisterOrGet(reg, m.latency).(*prometheus.HistogramVec)
}

return m
Expand Down
40 changes: 3 additions & 37 deletions internal/component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ import (
"connectrpc.com/connect"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/alloy/internal/alloyseed"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/config"
"github.com/grafana/alloy/internal/component/pyroscope"
"github.com/grafana/alloy/internal/component/pyroscope/util"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/useragent"
"github.com/grafana/dskit/backoff"
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
Expand All @@ -47,36 +43,8 @@ var (
},
}
}
_ component.Component = (*Component)(nil)
)

func init() {
component.Register(component.Registration{
Name: "pyroscope.write",
Stability: featuregate.StabilityGenerallyAvailable,
Args: Arguments{},
Exports: Exports{},
Build: func(o component.Options, c component.Arguments) (component.Component, error) {
tracer := o.Tracer.Tracer("pyroscope.write")
args := c.(Arguments)
userAgent := useragent.Get()
uid := alloyseed.Get().UID

return New(
o.Logger,
tracer,
o.Registerer,
func(exports Exports) {
o.OnStateChange(exports)
},
userAgent,
uid,
args,
)
},
})
}

// Arguments represents the input state of the pyroscope.write
// component.
type Arguments struct {
Expand Down Expand Up @@ -180,18 +148,16 @@ func New(
}, nil
}

var _ component.Component = (*Component)(nil)

// Run implements Component.
func (c *Component) Run(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}

// Update implements Component.
func (c *Component) Update(newConfig component.Arguments) error {
c.cfg = newConfig.(Arguments)
receiver, err := newFanOut(c.logger, c.tracer, newConfig.(Arguments), c.metrics, c.userAgent, c.uid)
func (c *Component) Update(newConfig Arguments) error {
c.cfg = newConfig
receiver, err := newFanOut(c.logger, c.tracer, newConfig, c.metrics, c.userAgent, c.uid)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions internal/component/pyroscope/write/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"connectrpc.com/connect"
"github.com/grafana/alloy/internal/component/pyroscope"
"github.com/grafana/alloy/internal/util"
pyrotestlogger "github.com/grafana/alloy/internal/component/pyroscope/util/testlog"
"github.com/grafana/alloy/syntax"
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
Expand Down Expand Up @@ -91,7 +91,7 @@ func Test_Write_FanOut(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
c, err := New(
util.TestAlloyLogger(t),
pyrotestlogger.TestLogger(t),
noop.Tracer{},
prometheus.NewRegistry(),
func(e Exports) {
Expand Down Expand Up @@ -170,7 +170,7 @@ func Test_Write_Update(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
c, err := New(
util.TestAlloyLogger(t),
pyrotestlogger.TestLogger(t),
noop.Tracer{},
prometheus.NewRegistry(),
func(e Exports) {
Expand Down Expand Up @@ -311,7 +311,7 @@ func (s *AppendIngestTestSuite) newComponent(argument Arguments) {
var err error
s.arguments = argument
s.component, err = New(
util.TestAlloyLogger(s.T()),
pyrotestlogger.TestLogger(s.T()),
noop.Tracer{},
prometheus.NewRegistry(),
func(e Exports) {
Expand Down Expand Up @@ -594,7 +594,7 @@ func Test_Write_FanOut_ValidateLabels(t *testing.T) {
var export Exports
wg.Add(1)
c, err := New(
util.TestAlloyLogger(t),
pyrotestlogger.TestLogger(t),
noop.Tracer{},
prometheus.NewRegistry(),
func(e Exports) {
Expand Down