diff --git a/internal/pusher/v2/errors.go b/internal/pusher/v2/errors.go index 54867fa7a..712acf78c 100644 --- a/internal/pusher/v2/errors.go +++ b/internal/pusher/v2/errors.go @@ -21,6 +21,7 @@ const ( errKindTenant // A problem with the tenant remotes. Fetch the tenant again. errKindFatal // There is a problem that can't be fixed by fetching the tenant. errKindTerminated // Push terminated (context canceled) + errKindLimit // There is a problem with the limits of the tenant. Discard it. ) func (k errKind) String() string { @@ -39,6 +40,8 @@ func (k errKind) String() string { return "fatal error" case errKindTerminated: return "terminate error" + case errKindLimit: + return "limit error" } return "unknown error" } @@ -120,8 +123,12 @@ var httpCodeMappings = map[int]struct { kind: errKindFatal, }, { + // We are hitting a limit in the maximum number of active streams allowed by Loki. + // There's nothing we can do about it other than reporting it. Because of the + // way Loki works, it's possible that we hit this limit once and don't hit it the next + // time we try to publish data. substr: "Maximum active stream limit exceeded", - kind: errKindFatal, + kind: errKindLimit, }, }, }, diff --git a/internal/pusher/v2/errors_test.go b/internal/pusher/v2/errors_test.go index 349248f54..f83cb575d 100644 --- a/internal/pusher/v2/errors_test.go +++ b/internal/pusher/v2/errors_test.go @@ -140,7 +140,7 @@ func TestParsePushError(t *testing.T) { Err: errors.New(`Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased, user: '1234'`), }, expected: pushError{ - kind: errKindFatal, + kind: errKindLimit, }, code: 429, }, diff --git a/internal/pusher/v2/queue.go b/internal/pusher/v2/queue.go index 31a9c1935..1dbccaec0 100644 --- a/internal/pusher/v2/queue.go +++ b/internal/pusher/v2/queue.go @@ -103,6 +103,8 @@ func (q *queue) push(ctx context.Context, remote *sm.RemoteInfo) error { Msg("store stream failed") } + // Drop the records by returning their buffers to the pool. They were either published or we + // gave up on them. size := q.options.pool.returnAll(records) switch pushErr.Kind() { @@ -119,6 +121,11 @@ func (q *queue) push(ctx context.Context, remote *sm.RemoteInfo) error { // a sample was discarded. q.options.metrics.ErrorCounter.WithLabelValues(statusCodeStr).Add(numRecords) + case errKindLimit: + // Some (?) of the data was ingested, but we don't have a way to know which part. + // Retrying won't help. Keep going. + q.options.metrics.ErrorCounter.WithLabelValues(statusCodeStr).Add(numRecords) + case errKindTenant, errKindFatal, errKindWait: // Terminate publisher. q.options.metrics.ErrorCounter.WithLabelValues(statusCodeStr).Add(numRecords) diff --git a/internal/pusher/v2/queue_test.go b/internal/pusher/v2/queue_test.go index 8d7cbb4cc..a27f6ef8a 100644 --- a/internal/pusher/v2/queue_test.go +++ b/internal/pusher/v2/queue_test.go @@ -304,11 +304,16 @@ func TestQueuePush(t *testing.T) { }, "fatal error": { responses: []http.HandlerFunc{ - respond(http.StatusTooManyRequests, "Maximum active stream limit exceeded"), + respond(http.StatusTooManyRequests, "limit: 0 "), }, expectedErr: pushError{ kind: errKindFatal, - inner: errors.New(`server returned HTTP status 429 Too Many Requests: Maximum active stream limit exceeded`), + inner: errors.New(`server returned HTTP status 429 Too Many Requests: limit: 0 `), + }, + }, + "limit error": { + responses: []http.HandlerFunc{ + respond(http.StatusTooManyRequests, "Maximum active stream limit exceeded"), }, }, } {