diff --git a/internal/k6runner/local.go b/internal/k6runner/local.go index e8c270b6b..707ab76b3 100644 --- a/internal/k6runner/local.go +++ b/internal/k6runner/local.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "net" "os" "os/exec" "time" @@ -116,7 +117,14 @@ func (r Local) Run(ctx context.Context, script Script, secretStore SecretStore) cmd.Stdin = nil cmd.Stdout = &stdout cmd.Stderr = &stderr - cmd.Env = k6Env(os.Environ()) + + port, err := findAvailablePort() + if err != nil { + return nil, fmt.Errorf("finding available port: %w", err) + } + cmd.Env = append([]string{}, fmt.Sprintf("K6_BROWSER_SCREENSHOTS_OUTPUT=url=http://127.0.0.1:%d", port)) + cmd.Env = k6Env(cmd.Env) + cmd.Env = append(cmd.Env, os.Environ()...) start := time.Now() logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script") @@ -143,28 +151,15 @@ func (r Local) Run(ctx context.Context, script Script, secretStore SecretStore) return nil, fmt.Errorf("executing k6 script: %w", err) } - // 256KiB is the maximum payload size for Loki. Set our limit slightly below that to avoid tripping the limit in - // case we inject some messages down the line. - const maxLogsSizeBytes = 255 * 1024 - // Mimir can also ingest up to 256KiB, but that's JSON-encoded, not promhttp encoded. // To be safe, we limit it to 100KiB promhttp-encoded, hoping than the more verbose json encoding overhead is less // than 2.5x. const maxMetricsSizeBytes = 100 * 1024 - logs, truncated, err := readFileLimit(afs.Fs, logsFn, maxLogsSizeBytes) + logs, err := afs.ReadFile(logsFn) if err != nil { return nil, fmt.Errorf("reading k6 logs: %w", err) } - if truncated { - logger.Warn(). - Str("filename", logsFn). - Int("limitBytes", maxLogsSizeBytes). - Msg("Logs output larger than limit, truncating") - - // Leave a truncation notice at the end. - fmt.Fprintf(logs, `level=error msg="Log output truncated at %d bytes"`+"\n", maxLogsSizeBytes) - } metrics, truncated, err := readFileLimit(afs.Fs, metricsFn, maxMetricsSizeBytes) if err != nil { @@ -177,10 +172,12 @@ func (r Local) Run(ctx context.Context, script Script, secretStore SecretStore) Msg("Metrics output larger than limit, truncating") // If we truncate metrics, also leave a truncation notice at the end of the logs. - fmt.Fprintf(logs, `level=error msg="Metrics output truncated at %d bytes"`+"\n", maxMetricsSizeBytes) + var metricsNotice bytes.Buffer + fmt.Fprintf(&metricsNotice, `level=error msg="Metrics output truncated at %d bytes"`+"\n", maxMetricsSizeBytes) + logs = append(logs, metricsNotice.Bytes()...) } - return &RunResponse{Metrics: metrics.Bytes(), Logs: logs.Bytes()}, errors.Join(err, errorFromLogs(logs.Bytes())) + return &RunResponse{Metrics: metrics.Bytes(), Logs: logs}, errors.Join(err, errorFromLogs(logs)) } func (r Local) buildK6Args(script Script, metricsFn, logsFn, scriptFn, configFile string) ([]string, error) { @@ -334,3 +331,12 @@ func createSecretConfigFile(url, token string) (filename string, cleanup func(), return tmpFile.Name(), func() { os.Remove(tmpFile.Name()) }, nil } + +func findAvailablePort() (int, error) { + addr, err := net.Listen("tcp", ":0") + if err != nil { + return 0, err + } + defer addr.Close() + return addr.Addr().(*net.TCPAddr).Port, nil +} diff --git a/internal/pkg/loki/loki.go b/internal/pkg/loki/loki.go index f93548c07..e19abd0d1 100644 --- a/internal/pkg/loki/loki.go +++ b/internal/pkg/loki/loki.go @@ -2,6 +2,7 @@ package loki import ( "context" + "fmt" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -9,17 +10,67 @@ import ( "github.com/grafana/synthetic-monitoring-agent/internal/pkg/prom" ) +// SplitStreamsIntoChunks splits a slice of streams into chunks that fit within maxBytes. +// It respects stream boundaries and ensures each chunk is a valid PushRequest. +func SplitStreamsIntoChunks(streams []logproto.Stream, maxBytes int) [][]logproto.Stream { + if len(streams) == 0 { + return nil + } + + var chunks [][]logproto.Stream + currentChunk := make([]logproto.Stream, 0) + + for _, stream := range streams { + // Create a temporary chunk with the new stream to check its size + tempChunk := append(currentChunk, stream) + req := &logproto.PushRequest{ + Streams: tempChunk, + } + data, err := proto.Marshal(req) + if err != nil { + // If we can't marshal, just add the stream and hope for the best + currentChunk = append(currentChunk, stream) + continue + } + + // If adding this stream would exceed the limit, start a new chunk + if len(data) > maxBytes && len(currentChunk) > 0 { + chunks = append(chunks, currentChunk) + currentChunk = make([]logproto.Stream, 0) + } + + currentChunk = append(currentChunk, stream) + } + + // Add the last chunk if it's not empty + if len(currentChunk) > 0 { + chunks = append(chunks, currentChunk) + } + + return chunks +} + // sendSamples to the remote storage with backoff for recoverable errors. func SendStreamsWithBackoff(ctx context.Context, client *prom.Client, streams []logproto.Stream, buf *[]byte) error { - req, err := buildStreamsPushRequest(streams, *buf) - *buf = req - if err != nil { - // Failing to build the write request is non-recoverable, since it will - // only error if marshaling the proto to bytes fails. - return err + // Split streams into chunks that fit within maxBytes + const maxBytes = 255 * 1024 // 255KB, slightly below Loki's 256KB limit + chunks := SplitStreamsIntoChunks(streams, maxBytes) + + for _, chunk := range chunks { + req, err := buildStreamsPushRequest(chunk, *buf) + *buf = req + if err != nil { + // Failing to build the write request is non-recoverable, since it will + // only error if marshaling the proto to bytes fails. + return err + } + + if err := prom.SendBytesWithBackoff(ctx, client, req); err != nil { + return fmt.Errorf("sending events: %w", err) + } } - return prom.SendBytesWithBackoff(ctx, client, req) + return nil } func buildStreamsPushRequest(streams []logproto.Stream, buf []byte) ([]byte, error) { diff --git a/internal/pkg/loki/loki_test.go b/internal/pkg/loki/loki_test.go new file mode 100644 index 000000000..11e5b4135 --- /dev/null +++ b/internal/pkg/loki/loki_test.go @@ -0,0 +1,77 @@ +package loki + +import ( + "testing" + + logproto "github.com/grafana/loki/pkg/push" + "github.com/stretchr/testify/require" +) + +func TestSplitStreamsIntoChunks(t *testing.T) { + tests := []struct { + name string + streams []logproto.Stream + maxBytes int + want [][]logproto.Stream + }{ + { + name: "empty streams", + streams: []logproto.Stream{}, + maxBytes: 1000, + want: nil, + }, + { + name: "single stream fits", + streams: []logproto.Stream{ + { + Labels: `{job="test"}`, + Entries: []logproto.Entry{{Line: "test1"}}, + }, + }, + maxBytes: 1000, + want: [][]logproto.Stream{ + { + { + Labels: `{job="test"}`, + Entries: []logproto.Entry{{Line: "test1"}}, + }, + }, + }, + }, + { + name: "multiple streams split correctly", + streams: []logproto.Stream{ + { + Labels: `{job="test1"}`, + Entries: []logproto.Entry{{Line: "test1"}}, + }, + { + Labels: `{job="test2"}`, + Entries: []logproto.Entry{{Line: "test2"}}, + }, + }, + maxBytes: 50, // Small enough to force splitting + want: [][]logproto.Stream{ + { + { + Labels: `{job="test1"}`, + Entries: []logproto.Entry{{Line: "test1"}}, + }, + }, + { + { + Labels: `{job="test2"}`, + Entries: []logproto.Entry{{Line: "test2"}}, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := SplitStreamsIntoChunks(tt.streams, tt.maxBytes) + require.Equal(t, tt.want, got) + }) + } +} diff --git a/internal/pusher/v2/tenant_pusher.go b/internal/pusher/v2/tenant_pusher.go index f6eb7bfe8..b92e51b82 100644 --- a/internal/pusher/v2/tenant_pusher.go +++ b/internal/pusher/v2/tenant_pusher.go @@ -15,6 +15,7 @@ import ( logproto "github.com/grafana/loki/pkg/push" "github.com/grafana/synthetic-monitoring-agent/internal/model" + "github.com/grafana/synthetic-monitoring-agent/internal/pkg/loki" "github.com/grafana/synthetic-monitoring-agent/internal/pusher" sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring" ) @@ -182,7 +183,11 @@ func (p *tenantPusher) publish(payload pusher.Payload) { } if len(payload.Streams()) > 0 { - p.logs.insert(toRequest(&logproto.PushRequest{Streams: payload.Streams()}, p.options.pool)) + const maxBytes = 255 * 1024 // 255KB, match loki's limit + chunks := loki.SplitStreamsIntoChunks(payload.Streams(), maxBytes) + for _, chunk := range chunks { + p.logs.insert(toRequest(&logproto.PushRequest{Streams: chunk}, p.options.pool)) + } } }