Skip to content

Sm chunk logs #1323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions internal/k6runner/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
"time"
Expand Down Expand Up @@ -116,7 +117,14 @@ func (r Local) Run(ctx context.Context, script Script, secretStore SecretStore)
cmd.Stdin = nil
cmd.Stdout = &stdout
cmd.Stderr = &stderr
cmd.Env = k6Env(os.Environ())

port, err := findAvailablePort()
if err != nil {
return nil, fmt.Errorf("finding available port: %w", err)
}
cmd.Env = append([]string{}, fmt.Sprintf("K6_BROWSER_SCREENSHOTS_OUTPUT=url=http://127.0.0.1:%d", port))
cmd.Env = k6Env(cmd.Env)
cmd.Env = append(cmd.Env, os.Environ()...)

start := time.Now()
logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script")
Expand All @@ -143,28 +151,15 @@ func (r Local) Run(ctx context.Context, script Script, secretStore SecretStore)
return nil, fmt.Errorf("executing k6 script: %w", err)
}

// 256KiB is the maximum payload size for Loki. Set our limit slightly below that to avoid tripping the limit in
// case we inject some messages down the line.
const maxLogsSizeBytes = 255 * 1024

// Mimir can also ingest up to 256KiB, but that's JSON-encoded, not promhttp encoded.
// To be safe, we limit it to 100KiB promhttp-encoded, hoping than the more verbose json encoding overhead is less
// than 2.5x.
const maxMetricsSizeBytes = 100 * 1024

logs, truncated, err := readFileLimit(afs.Fs, logsFn, maxLogsSizeBytes)
logs, err := afs.ReadFile(logsFn)
if err != nil {
return nil, fmt.Errorf("reading k6 logs: %w", err)
}
if truncated {
logger.Warn().
Str("filename", logsFn).
Int("limitBytes", maxLogsSizeBytes).
Msg("Logs output larger than limit, truncating")

// Leave a truncation notice at the end.
fmt.Fprintf(logs, `level=error msg="Log output truncated at %d bytes"`+"\n", maxLogsSizeBytes)
}

metrics, truncated, err := readFileLimit(afs.Fs, metricsFn, maxMetricsSizeBytes)
if err != nil {
Expand All @@ -177,10 +172,12 @@ func (r Local) Run(ctx context.Context, script Script, secretStore SecretStore)
Msg("Metrics output larger than limit, truncating")

// If we truncate metrics, also leave a truncation notice at the end of the logs.
fmt.Fprintf(logs, `level=error msg="Metrics output truncated at %d bytes"`+"\n", maxMetricsSizeBytes)
var metricsNotice bytes.Buffer
fmt.Fprintf(&metricsNotice, `level=error msg="Metrics output truncated at %d bytes"`+"\n", maxMetricsSizeBytes)
logs = append(logs, metricsNotice.Bytes()...)
}

return &RunResponse{Metrics: metrics.Bytes(), Logs: logs.Bytes()}, errors.Join(err, errorFromLogs(logs.Bytes()))
return &RunResponse{Metrics: metrics.Bytes(), Logs: logs}, errors.Join(err, errorFromLogs(logs))
}

func (r Local) buildK6Args(script Script, metricsFn, logsFn, scriptFn, configFile string) ([]string, error) {
Expand Down Expand Up @@ -334,3 +331,12 @@ func createSecretConfigFile(url, token string) (filename string, cleanup func(),

return tmpFile.Name(), func() { os.Remove(tmpFile.Name()) }, nil
}

func findAvailablePort() (int, error) {
addr, err := net.Listen("tcp", ":0")
if err != nil {
return 0, err
}
defer addr.Close()
return addr.Addr().(*net.TCPAddr).Port, nil
}
65 changes: 58 additions & 7 deletions internal/pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,75 @@ package loki

import (
"context"
"fmt"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
logproto "github.com/grafana/loki/pkg/push"
"github.com/grafana/synthetic-monitoring-agent/internal/pkg/prom"
)

// SplitStreamsIntoChunks splits a slice of streams into chunks that fit within maxBytes.
// It respects stream boundaries and ensures each chunk is a valid PushRequest.
func SplitStreamsIntoChunks(streams []logproto.Stream, maxBytes int) [][]logproto.Stream {
if len(streams) == 0 {
return nil
}

var chunks [][]logproto.Stream
currentChunk := make([]logproto.Stream, 0)

for _, stream := range streams {
// Create a temporary chunk with the new stream to check its size
tempChunk := append(currentChunk, stream)
req := &logproto.PushRequest{
Streams: tempChunk,
}
data, err := proto.Marshal(req)
if err != nil {
// If we can't marshal, just add the stream and hope for the best
currentChunk = append(currentChunk, stream)
continue
}

// If adding this stream would exceed the limit, start a new chunk
if len(data) > maxBytes && len(currentChunk) > 0 {
chunks = append(chunks, currentChunk)
currentChunk = make([]logproto.Stream, 0)
}

currentChunk = append(currentChunk, stream)
}

// Add the last chunk if it's not empty
if len(currentChunk) > 0 {
chunks = append(chunks, currentChunk)
}

return chunks
}

// sendSamples to the remote storage with backoff for recoverable errors.
func SendStreamsWithBackoff(ctx context.Context, client *prom.Client, streams []logproto.Stream, buf *[]byte) error {
req, err := buildStreamsPushRequest(streams, *buf)
*buf = req
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
// Split streams into chunks that fit within maxBytes
const maxBytes = 255 * 1024 // 255KB, slightly below Loki's 256KB limit
chunks := SplitStreamsIntoChunks(streams, maxBytes)

for _, chunk := range chunks {
req, err := buildStreamsPushRequest(chunk, *buf)
*buf = req
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}

if err := prom.SendBytesWithBackoff(ctx, client, req); err != nil {
return fmt.Errorf("sending events: %w", err)
}
}

return prom.SendBytesWithBackoff(ctx, client, req)
return nil
}

func buildStreamsPushRequest(streams []logproto.Stream, buf []byte) ([]byte, error) {
Expand Down
77 changes: 77 additions & 0 deletions internal/pkg/loki/loki_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package loki

import (
"testing"

logproto "github.com/grafana/loki/pkg/push"
"github.com/stretchr/testify/require"
)

func TestSplitStreamsIntoChunks(t *testing.T) {
tests := []struct {
name string
streams []logproto.Stream
maxBytes int
want [][]logproto.Stream
}{
{
name: "empty streams",
streams: []logproto.Stream{},
maxBytes: 1000,
want: nil,
},
{
name: "single stream fits",
streams: []logproto.Stream{
{
Labels: `{job="test"}`,
Entries: []logproto.Entry{{Line: "test1"}},
},
},
maxBytes: 1000,
want: [][]logproto.Stream{
{
{
Labels: `{job="test"}`,
Entries: []logproto.Entry{{Line: "test1"}},
},
},
},
},
{
name: "multiple streams split correctly",
streams: []logproto.Stream{
{
Labels: `{job="test1"}`,
Entries: []logproto.Entry{{Line: "test1"}},
},
{
Labels: `{job="test2"}`,
Entries: []logproto.Entry{{Line: "test2"}},
},
},
maxBytes: 50, // Small enough to force splitting
want: [][]logproto.Stream{
{
{
Labels: `{job="test1"}`,
Entries: []logproto.Entry{{Line: "test1"}},
},
},
{
{
Labels: `{job="test2"}`,
Entries: []logproto.Entry{{Line: "test2"}},
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := SplitStreamsIntoChunks(tt.streams, tt.maxBytes)
require.Equal(t, tt.want, got)
})
}
}
7 changes: 6 additions & 1 deletion internal/pusher/v2/tenant_pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

logproto "github.com/grafana/loki/pkg/push"
"github.com/grafana/synthetic-monitoring-agent/internal/model"
"github.com/grafana/synthetic-monitoring-agent/internal/pkg/loki"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
)
Expand Down Expand Up @@ -182,7 +183,11 @@ func (p *tenantPusher) publish(payload pusher.Payload) {
}

if len(payload.Streams()) > 0 {
p.logs.insert(toRequest(&logproto.PushRequest{Streams: payload.Streams()}, p.options.pool))
const maxBytes = 255 * 1024 // 255KB, match loki's limit
chunks := loki.SplitStreamsIntoChunks(payload.Streams(), maxBytes)
for _, chunk := range chunks {
p.logs.insert(toRequest(&logproto.PushRequest{Streams: chunk}, p.options.pool))
}
}
}

Expand Down
Loading