Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions cmd/synthetic-monitoring-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/grpclog"

"github.com/grafana/synthetic-monitoring-agent/internal/adhoc"
"github.com/grafana/synthetic-monitoring-agent/internal/cals"
"github.com/grafana/synthetic-monitoring-agent/internal/checks"
"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/http"
Expand Down Expand Up @@ -314,6 +315,7 @@ func run(args []string, stdout io.Writer) error {

publisher := publisherFactory(ctx, tm, zl.With().Str("subsystem", "publisher").Str("version", config.SelectedPublisher).Logger(), promRegisterer)
limits := limits.NewTenantLimits(tm)
cals := cals.NewTenantCostAttributionLabels(tm)
secrets := secrets.NewTenantSecrets(tm, zl.With().Str("subsystem", "secretstore").Logger())

telemetry := telemetry.NewTelemeter(
Expand All @@ -324,20 +326,21 @@ func run(args []string, stdout io.Writer) error {
)

checksUpdater, err := checks.NewUpdater(checks.UpdaterOptions{
Conn: conn,
Logger: zl.With().Str("subsystem", "updater").Logger(),
Backoff: newConnectionBackoff(),
Publisher: publisher,
TenantCh: tenantCh,
IsConnected: readynessHandler.Set,
PromRegisterer: promRegisterer,
Features: features,
K6Runner: k6Runner,
ScraperFactory: scraper.New,
TenantLimits: limits,
TenantSecrets: secrets,
Telemeter: telemetry,
UsageReporter: usageReporter,
Conn: conn,
Logger: zl.With().Str("subsystem", "updater").Logger(),
Backoff: newConnectionBackoff(),
Publisher: publisher,
TenantCh: tenantCh,
IsConnected: readynessHandler.Set,
PromRegisterer: promRegisterer,
Features: features,
K6Runner: k6Runner,
ScraperFactory: scraper.New,
TenantLimits: limits,
TenantSecrets: secrets,
Telemeter: telemetry,
UsageReporter: usageReporter,
CostAttributionLabels: cals,
})
if err != nil {
return fmt.Errorf("cannot create checks updater: %w", err)
Expand Down
37 changes: 37 additions & 0 deletions internal/cals/tenant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cals

import (
"context"

"github.com/grafana/synthetic-monitoring-agent/internal/model"
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
)

type TenantProvider interface {
GetTenant(context.Context, *sm.TenantInfo) (*sm.Tenant, error)
}

// TenantCostAttributionLabels has a TenantProvider that pulls data about a specific tenant
type TenantCostAttributionLabels struct {
provider TenantProvider
}

// NewTenantCostAttributionLabels is a helper method to create a NewTenantCostAttributionLabels provider
func NewTenantCostAttributionLabels(provider TenantProvider) *TenantCostAttributionLabels {
return &TenantCostAttributionLabels{
provider: provider,
}
}

// CostAttributionLabels will call TenantProvider.GetTenant to search for a specific tenant and returns Tenant.CostAttributionLabel
func (tcal TenantCostAttributionLabels) CostAttributionLabels(ctx context.Context, tenantID model.GlobalID) ([]string, error) {
tenant, err := tcal.provider.GetTenant(ctx, &sm.TenantInfo{
Id: int64(tenantID),
})

if err != nil {
return nil, err
}

return tenant.CostAttributionLabels, nil
}
35 changes: 20 additions & 15 deletions internal/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

logproto "github.com/grafana/loki/pkg/push"

"github.com/grafana/synthetic-monitoring-agent/internal/cals"
"github.com/grafana/synthetic-monitoring-agent/internal/error_types"
"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
Expand Down Expand Up @@ -84,6 +85,7 @@ type Updater struct {
tenantSecrets *secrets.TenantSecrets
telemeter *telemetry.Telemeter
usageReporter usage.Reporter
tenantCals *cals.TenantCostAttributionLabels
}

type apiInfo struct {
Expand All @@ -106,20 +108,21 @@ type (
)

type UpdaterOptions struct {
Conn *grpc.ClientConn
Logger zerolog.Logger
Backoff Backoffer
Publisher pusher.Publisher
TenantCh chan<- sm.Tenant
IsConnected func(bool)
PromRegisterer prometheus.Registerer
Features feature.Collection
K6Runner k6runner.Runner
ScraperFactory scraper.Factory
TenantLimits *limits.TenantLimits
Telemeter *telemetry.Telemeter
TenantSecrets *secrets.TenantSecrets
UsageReporter usage.Reporter
Conn *grpc.ClientConn
Logger zerolog.Logger
Backoff Backoffer
Publisher pusher.Publisher
TenantCh chan<- sm.Tenant
IsConnected func(bool)
PromRegisterer prometheus.Registerer
Features feature.Collection
K6Runner k6runner.Runner
ScraperFactory scraper.Factory
TenantLimits *limits.TenantLimits
Telemeter *telemetry.Telemeter
TenantSecrets *secrets.TenantSecrets
UsageReporter usage.Reporter
CostAttributionLabels *cals.TenantCostAttributionLabels
}

func NewUpdater(opts UpdaterOptions) (*Updater, error) {
Expand Down Expand Up @@ -253,6 +256,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) {
scrapesCounter: scrapesCounter,
},
usageReporter: opts.UsageReporter,
tenantCals: opts.CostAttributionLabels,
}, nil
}

Expand Down Expand Up @@ -935,8 +939,9 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check model.Ch
c.logger,
metrics,
c.k6Runner,
c.tenantLimits, c.telemeter, c.tenantSecrets,
c.tenantLimits, c.telemeter, c.tenantSecrets, c.tenantCals,
)

if err != nil {
return fmt.Errorf("cannot create new scraper: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/checks/checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/grafana/synthetic-monitoring-agent/internal/cals"
"github.com/grafana/synthetic-monitoring-agent/internal/secrets"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -474,6 +475,7 @@ func testScraperFactory(ctx context.Context, check model.Check, publisher pusher
labelsLimiter scraper.LabelsLimiter,
telemeter *telemetry.Telemeter,
secretStore *secrets.TenantSecrets,
cals *cals.TenantCostAttributionLabels,
) (*scraper.Scraper, error) {
return scraper.NewWithOpts(
ctx,
Expand Down
51 changes: 37 additions & 14 deletions internal/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/grafana/synthetic-monitoring-agent/internal/cals"
"github.com/grafana/synthetic-monitoring-agent/internal/secrets"

kitlog "github.com/go-kit/kit/log" //nolint:staticcheck // TODO(mem): replace in BBE
Expand Down Expand Up @@ -58,6 +59,10 @@ type LabelsLimiter interface {
LogLabels(ctx context.Context, tenantID model.GlobalID) (int, error)
}

type TenantCals interface {
CostAttributionLabels(ctx context.Context, tenantID model.GlobalID) ([]string, error)
}

type Telemeter interface {
AddExecution(e telemetry.Execution)
}
Expand All @@ -77,6 +82,7 @@ type Scraper struct {
summaries map[uint64]prometheus.Summary
histograms map[uint64]prometheus.Histogram
telemeter Telemeter
cals TenantCals
}

type Factory func(
Expand All @@ -88,6 +94,7 @@ type Factory func(
labelsLimiter LabelsLimiter,
telemeter *telemetry.Telemeter,
secretStore *secrets.TenantSecrets,
costAttributionLabels *cals.TenantCostAttributionLabels,
) (*Scraper, error)

type (
Expand Down Expand Up @@ -122,28 +129,31 @@ func New(
labelsLimiter LabelsLimiter,
telemeter *telemetry.Telemeter,
secretStore *secrets.TenantSecrets,
cals *cals.TenantCostAttributionLabels,
) (*Scraper, error) {
return NewWithOpts(ctx, check, ScraperOpts{
Probe: probe,
Publisher: publisher,
Logger: logger,
Metrics: metrics,
ProbeFactory: prober.NewProberFactory(k6runner, probe.Id, features, secretStore),
LabelsLimiter: labelsLimiter,
Telemeter: telemeter,
Probe: probe,
Publisher: publisher,
Logger: logger,
Metrics: metrics,
ProbeFactory: prober.NewProberFactory(k6runner, probe.Id, features, secretStore),
LabelsLimiter: labelsLimiter,
Telemeter: telemeter,
CostAttributionLabels: cals,
})
}

var _ Factory = New

type ScraperOpts struct {
Probe sm.Probe
Publisher pusher.Publisher
Logger zerolog.Logger
Metrics Metrics
ProbeFactory prober.ProberFactory
LabelsLimiter LabelsLimiter
Telemeter Telemeter
Probe sm.Probe
Publisher pusher.Publisher
Logger zerolog.Logger
Metrics Metrics
ProbeFactory prober.ProberFactory
LabelsLimiter LabelsLimiter
Telemeter Telemeter
CostAttributionLabels TenantCals
}

func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scraper, error) {
Expand Down Expand Up @@ -181,6 +191,7 @@ func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scr
summaries: make(map[uint64]prometheus.Summary),
histograms: make(map[uint64]prometheus.Histogram),
telemeter: opts.Telemeter,
cals: opts.CostAttributionLabels,
}, nil
}

Expand Down Expand Up @@ -289,6 +300,18 @@ func (h *scrapeHandler) scrape(ctx context.Context, t time.Time) {
})
}

costAttributionLabels, err := h.scraper.cals.CostAttributionLabels(ctx, h.payload.tenantId)
if err != nil {
h.scraper.logger.Error().
Int64("tenantId", int64(h.payload.tenantId)).
Msg("Could not load cals")
}

h.scraper.logger.Debug().
Int64("tenantId", int64(h.payload.tenantId)).
Int("costAttributionLabelsCount", len(costAttributionLabels)).
Msgf("Cost Atttribution Labels: %v", costAttributionLabels)

// If we are dropping the data in case of errors, we should not count that execution.
h.scraper.telemeter.AddExecution(telemetry.Execution{
LocalTenantID: h.scraper.check.TenantId,
Expand Down
11 changes: 11 additions & 0 deletions internal/scraper/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,14 @@ func (l testLabelsLimiter) LogLabels(ctx context.Context, tenantID model.GlobalI
return l.maxLogLabels, nil
}

type testCalTenants struct {
costAttributionLabels []string
}

func (t testCalTenants) CostAttributionLabels(_ context.Context, tenantID model.GlobalID) ([]string, error) {
return t.costAttributionLabels, nil
}

//nolint:gocyclo
func TestScraperCollectData(t *testing.T) {
const (
Expand Down Expand Up @@ -1875,6 +1883,9 @@ func TestScraperRun(t *testing.T) {
maxLogLabels: 15,
},
Telemeter: testTelemeter,
CostAttributionLabels: testCalTenants{
costAttributionLabels: []string{"testing", "you"},
},
})

require.NoError(t, err)
Expand Down
Loading