Skip to content

fix: Don't stop publisher if we hit Loki's stream limit #1356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion internal/pusher/v2/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
errKindTenant // A problem with the tenant remotes. Fetch the tenant again.
errKindFatal // There is a problem that can't be fixed by fetching the tenant.
errKindTerminated // Push terminated (context canceled)
errKindLimit // There is a problem with the limits of the tenant. Discard it.
)

func (k errKind) String() string {
Expand All @@ -39,6 +40,8 @@ func (k errKind) String() string {
return "fatal error"
case errKindTerminated:
return "terminate error"
case errKindLimit:
return "limit error"
}
return "unknown error"
}
Expand Down Expand Up @@ -120,8 +123,12 @@ var httpCodeMappings = map[int]struct {
kind: errKindFatal,
},
{
// We are hitting a limit in the maximum number of active streams allowed by Loki.
// There's nothing we can do about it other than reporting it. Because of the
// way Loki works, it's possible that we hit this limit once and don't hit it the next
// time we try to publish data.
substr: "Maximum active stream limit exceeded",
kind: errKindFatal,
kind: errKindLimit,
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion internal/pusher/v2/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestParsePushError(t *testing.T) {
Err: errors.New(`Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased, user: '1234'`),
},
expected: pushError{
kind: errKindFatal,
kind: errKindLimit,
},
code: 429,
},
Expand Down
7 changes: 7 additions & 0 deletions internal/pusher/v2/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (q *queue) push(ctx context.Context, remote *sm.RemoteInfo) error {
Msg("store stream failed")
}

// Drop the records by returning their buffers to the pool. They were either published or we
// gave up on them.
size := q.options.pool.returnAll(records)

switch pushErr.Kind() {
Expand All @@ -119,6 +121,11 @@ func (q *queue) push(ctx context.Context, remote *sm.RemoteInfo) error {
// a sample was discarded.
q.options.metrics.ErrorCounter.WithLabelValues(statusCodeStr).Add(numRecords)

case errKindLimit:
// Some (?) of the data was ingested, but we don't have a way to know which part.
// Retrying won't help. Keep going.
q.options.metrics.ErrorCounter.WithLabelValues(statusCodeStr).Add(numRecords)

case errKindTenant, errKindFatal, errKindWait:
// Terminate publisher.
q.options.metrics.ErrorCounter.WithLabelValues(statusCodeStr).Add(numRecords)
Expand Down
9 changes: 7 additions & 2 deletions internal/pusher/v2/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,16 @@ func TestQueuePush(t *testing.T) {
},
"fatal error": {
responses: []http.HandlerFunc{
respond(http.StatusTooManyRequests, "Maximum active stream limit exceeded"),
respond(http.StatusTooManyRequests, "limit: 0 "),
},
expectedErr: pushError{
kind: errKindFatal,
inner: errors.New(`server returned HTTP status 429 Too Many Requests: Maximum active stream limit exceeded`),
inner: errors.New(`server returned HTTP status 429 Too Many Requests: limit: 0 `),
},
},
"limit error": {
responses: []http.HandlerFunc{
respond(http.StatusTooManyRequests, "Maximum active stream limit exceeded"),
},
},
} {
Expand Down
Loading