diff --git a/internal/pusher/v2/options.go b/internal/pusher/v2/options.go index 20a097d98..c65e61beb 100644 --- a/internal/pusher/v2/options.go +++ b/internal/pusher/v2/options.go @@ -24,11 +24,7 @@ var ( // Max bytes to hold queued // 0: Disabled - maxQueuedBytes: 128 * 1024, - - // Max items (check results) to hold in memory (per tenant per type) - // 0: Disabled - maxQueuedItems: 128, + maxQueuedBytes: 1024 * 1024, // Max time to keep an item in the queue before it's discarded // Note that loki/mimir will probably reject data older than 1h anyway. @@ -37,11 +33,11 @@ var ( // Max number of retries in case of network(retriable) error. // Ideally we make this big and have data expired with the above limits. - maxRetries: 20, + maxRetries: 50, // Backoff between retries. Doubling at each attempt. - minBackoff: time.Millisecond * 30, - maxBackoff: time.Second * 2, + minBackoff: time.Millisecond * 50, + maxBackoff: time.Second * 30, // Max time a tenant pusher is active. This is useful to cause the tenant info // to be refreshed. @@ -53,7 +49,7 @@ var ( // How long without receiving check results until a tenant pusher is stopped. // This is to cleanup tenants that don't have active checks anymore. // Set it to a value higher than the max interval between a single check run. - maxIdleTime: 5 * time.Minute, + maxIdleTime: 65 * time.Minute, // How long to wait before refreshing tenant due to an error. tenantDelay: 10 * time.Second, @@ -78,7 +74,6 @@ var ( type pusherOptions struct { maxPushBytes uint64 // Max bytes to send on a single push request maxQueuedBytes uint64 // Max bytes to hold queued - maxQueuedItems int // Max items (check results) to hold in memory maxQueuedTime time.Duration // Max time an item can be queued until it expires maxRetries int // Max retries for a push minBackoff time.Duration diff --git a/internal/pusher/v2/queue.go b/internal/pusher/v2/queue.go index 1dbccaec0..6f5731a95 100644 --- a/internal/pusher/v2/queue.go +++ b/internal/pusher/v2/queue.go @@ -155,25 +155,12 @@ func (q *queue) insert(data *[]byte) { } func (q *queue) applyLimits() { - numDropped := q.limitNumItems(q.options.maxQueuedItems) + q.limitBytes(q.options.maxQueuedBytes) + q.limitAge(q.options.maxQueuedTime) + numDropped := q.limitBytes(q.options.maxQueuedBytes) + q.limitAge(q.options.maxQueuedTime) if numDropped > 0 { q.options.metrics.DroppedCounter.WithLabelValues().Add(float64(numDropped)) } } -func (q *queue) limitNumItems(max int) (numRemoved int) { - var ( - n = len(q.data) - excess = n - max - ) - if max <= 0 || excess <= 0 { - return 0 - } - q.options.pool.returnAll(q.data[:excess]) - q.data = q.data[excess:] - return excess -} - func (q *queue) limitBytes(max uint64) (numRemoved int) { n := len(q.data) if max <= 0 { diff --git a/internal/pusher/v2/queue_test.go b/internal/pusher/v2/queue_test.go index a27f6ef8a..49d207614 100644 --- a/internal/pusher/v2/queue_test.go +++ b/internal/pusher/v2/queue_test.go @@ -22,7 +22,6 @@ func TestQueue(t *testing.T) { defaultOptions := pusherOptions{ maxPushBytes: 1024 * 1024, maxQueuedBytes: 0, - maxQueuedItems: 0, maxQueuedTime: 0, } @@ -136,42 +135,6 @@ func TestQueue(t *testing.T) { }, countDropped: 2, }, - "max queued items": { - options: &pusherOptions{ - maxPushBytes: 1024, - maxQueuedItems: 2, - }, - actions: []testAction{ - insert(9), - insert(11), - expect(timeout, []int{9, 11}), - insert(10), - insert(11), - insert(15), - insert(5), - insert(1), - expect(timeout, []int{5, 1}), - expectEmpty(), - }, - countDropped: 3, - }, - "max queued items return last": { - options: &pusherOptions{ - maxPushBytes: 1024, - maxQueuedItems: 3, - }, - actions: []testAction{ - insert(1), - insert(2), - expect(timeout, []int{1, 2}), - insert(3), - insert(4), - returnLast(), - expect(timeout, []int{2, 3, 4}), - expectEmpty(), - }, - countDropped: 1, - }, "max queued time": { options: &pusherOptions{ maxQueuedTime: 100 * time.Millisecond, @@ -227,6 +190,11 @@ func TestQueuePush(t *testing.T) { _, _ = w.Write([]byte(body)) } } + + testOptions := defaultPusherOptions + testOptions.minBackoff = 10 * time.Millisecond + testOptions.maxBackoff = 50 * time.Millisecond + for title, tc := range map[string]struct { options *pusherOptions responses []http.HandlerFunc @@ -277,7 +245,7 @@ func TestQueuePush(t *testing.T) { }, "max retries": { options: func() *pusherOptions { - opt := defaultPusherOptions + opt := testOptions opt.maxRetries = 3 return &opt }(), @@ -330,7 +298,7 @@ func TestQueuePush(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) defer cancel() - opt := defaultPusherOptions + opt := testOptions if tc.options != nil { opt = *tc.options }