From 788f4152726957dc209bcad470b31c16fd08380c Mon Sep 17 00:00:00 2001 From: ka3de Date: Mon, 13 Oct 2025 10:59:52 +0200 Subject: [PATCH 1/5] feat: Adjust max tenant idle time Specifies the time to wait for new check execution's data for the tenant before removing the handler, which would cause any non delivered data to be lost. Adjust it to 65 minutes. In the worst case scenario, where the publisher is unable to push for 1h, setting it to 65 minutes will allow the longer running checks (current max=1h) to still buffer data for new executions. --- internal/pusher/v2/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pusher/v2/options.go b/internal/pusher/v2/options.go index 20a097d98..ce131de66 100644 --- a/internal/pusher/v2/options.go +++ b/internal/pusher/v2/options.go @@ -53,7 +53,7 @@ var ( // How long without receiving check results until a tenant pusher is stopped. // This is to cleanup tenants that don't have active checks anymore. // Set it to a value higher than the max interval between a single check run. - maxIdleTime: 5 * time.Minute, + maxIdleTime: 65 * time.Minute, // How long to wait before refreshing tenant due to an error. tenantDelay: 10 * time.Second, From c5cf7e834ed109264932ef9d3a8bb2bbf8de067b Mon Sep 17 00:00:00 2001 From: ka3de Date: Mon, 13 Oct 2025 11:19:32 +0200 Subject: [PATCH 2/5] feat: Adjust backoff and retries When retries are exhausted, the data being pushed is not added again to the buffer queue, and instead it's discarded. With the previous configuration (20 retries, 30ms min backoff, 2s max backoff), if we consider that each push request takes ~200ms, the total time before exhausting retries and discarding data was ~35s. With the new configuration (50 retries, 50ms min backoff, 30s max backoff), if we consider that each push request takes ~200ms, the total time before exhausting retries and discarding data is ~20min. Notice that data might still be discarded earlier due to buffer size limits. --- internal/pusher/v2/options.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/pusher/v2/options.go b/internal/pusher/v2/options.go index ce131de66..f364bf414 100644 --- a/internal/pusher/v2/options.go +++ b/internal/pusher/v2/options.go @@ -37,11 +37,11 @@ var ( // Max number of retries in case of network(retriable) error. // Ideally we make this big and have data expired with the above limits. - maxRetries: 20, + maxRetries: 50, // Backoff between retries. Doubling at each attempt. - minBackoff: time.Millisecond * 30, - maxBackoff: time.Second * 2, + minBackoff: time.Millisecond * 50, + maxBackoff: time.Second * 30, // Max time a tenant pusher is active. This is useful to cause the tenant info // to be refreshed. From 85179cc04b9e82b9d68ce985427726195a68835c Mon Sep 17 00:00:00 2001 From: ka3de Date: Mon, 13 Oct 2025 11:58:09 +0200 Subject: [PATCH 3/5] feat: Increase max queued bytes Specifies the max number of bytes to hold in the buffer per tenant in case of unsuccessfull retries before discarding data. In lack of better data for calculations.. running rough estimates we can consider each check execution to report ~15 metrics with 10 labels and one sample each. That will give us a rough estimate of ~5Kb per check execution. With the previous value of 128Kb, that would hold data for 25 check executions. In the best scenario where a tenant runs a single check in that agent, considering a check frequency of 1m, that would allow for 25 minutes before discarding data. In most cases this number will be much lower, as a tenant will be running more than one check in the same agent, resulting in a very short time window before data is discarded. Therefore, increase the buffer limit size to 1mb. Considering that that will only have an effect when data is unable to be pushed to particular Mimir/Loki cells, accept the memory increase in order to try to reduce discarded data. Autoscalers will scale agents if necessary based on the memory threshold. --- internal/pusher/v2/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pusher/v2/options.go b/internal/pusher/v2/options.go index f364bf414..c4a722a65 100644 --- a/internal/pusher/v2/options.go +++ b/internal/pusher/v2/options.go @@ -24,7 +24,7 @@ var ( // Max bytes to hold queued // 0: Disabled - maxQueuedBytes: 128 * 1024, + maxQueuedBytes: 1024 * 1024, // Max items (check results) to hold in memory (per tenant per type) // 0: Disabled From 9b4e3002b585cd64bd5b0aaa7541dd5fd508eee6 Mon Sep 17 00:00:00 2001 From: ka3de Date: Mon, 13 Oct 2025 12:29:39 +0200 Subject: [PATCH 4/5] feat: Avoid discarding data based on num items Number of items limit seems difficul to reason about and setting a reasonable limit. Therefore remove this limit and apply only the size limit which fulfills the same purpose. --- internal/pusher/v2/options.go | 5 ----- internal/pusher/v2/queue.go | 15 +------------ internal/pusher/v2/queue_test.go | 37 -------------------------------- 3 files changed, 1 insertion(+), 56 deletions(-) diff --git a/internal/pusher/v2/options.go b/internal/pusher/v2/options.go index c4a722a65..c65e61beb 100644 --- a/internal/pusher/v2/options.go +++ b/internal/pusher/v2/options.go @@ -26,10 +26,6 @@ var ( // 0: Disabled maxQueuedBytes: 1024 * 1024, - // Max items (check results) to hold in memory (per tenant per type) - // 0: Disabled - maxQueuedItems: 128, - // Max time to keep an item in the queue before it's discarded // Note that loki/mimir will probably reject data older than 1h anyway. // 0: Disabled @@ -78,7 +74,6 @@ var ( type pusherOptions struct { maxPushBytes uint64 // Max bytes to send on a single push request maxQueuedBytes uint64 // Max bytes to hold queued - maxQueuedItems int // Max items (check results) to hold in memory maxQueuedTime time.Duration // Max time an item can be queued until it expires maxRetries int // Max retries for a push minBackoff time.Duration diff --git a/internal/pusher/v2/queue.go b/internal/pusher/v2/queue.go index 1dbccaec0..6f5731a95 100644 --- a/internal/pusher/v2/queue.go +++ b/internal/pusher/v2/queue.go @@ -155,25 +155,12 @@ func (q *queue) insert(data *[]byte) { } func (q *queue) applyLimits() { - numDropped := q.limitNumItems(q.options.maxQueuedItems) + q.limitBytes(q.options.maxQueuedBytes) + q.limitAge(q.options.maxQueuedTime) + numDropped := q.limitBytes(q.options.maxQueuedBytes) + q.limitAge(q.options.maxQueuedTime) if numDropped > 0 { q.options.metrics.DroppedCounter.WithLabelValues().Add(float64(numDropped)) } } -func (q *queue) limitNumItems(max int) (numRemoved int) { - var ( - n = len(q.data) - excess = n - max - ) - if max <= 0 || excess <= 0 { - return 0 - } - q.options.pool.returnAll(q.data[:excess]) - q.data = q.data[excess:] - return excess -} - func (q *queue) limitBytes(max uint64) (numRemoved int) { n := len(q.data) if max <= 0 { diff --git a/internal/pusher/v2/queue_test.go b/internal/pusher/v2/queue_test.go index a27f6ef8a..11368f41c 100644 --- a/internal/pusher/v2/queue_test.go +++ b/internal/pusher/v2/queue_test.go @@ -22,7 +22,6 @@ func TestQueue(t *testing.T) { defaultOptions := pusherOptions{ maxPushBytes: 1024 * 1024, maxQueuedBytes: 0, - maxQueuedItems: 0, maxQueuedTime: 0, } @@ -136,42 +135,6 @@ func TestQueue(t *testing.T) { }, countDropped: 2, }, - "max queued items": { - options: &pusherOptions{ - maxPushBytes: 1024, - maxQueuedItems: 2, - }, - actions: []testAction{ - insert(9), - insert(11), - expect(timeout, []int{9, 11}), - insert(10), - insert(11), - insert(15), - insert(5), - insert(1), - expect(timeout, []int{5, 1}), - expectEmpty(), - }, - countDropped: 3, - }, - "max queued items return last": { - options: &pusherOptions{ - maxPushBytes: 1024, - maxQueuedItems: 3, - }, - actions: []testAction{ - insert(1), - insert(2), - expect(timeout, []int{1, 2}), - insert(3), - insert(4), - returnLast(), - expect(timeout, []int{2, 3, 4}), - expectEmpty(), - }, - countDropped: 1, - }, "max queued time": { options: &pusherOptions{ maxQueuedTime: 100 * time.Millisecond, From a75a29eb3a8f97f47339747f878760fc68223312 Mon Sep 17 00:00:00 2001 From: ka3de Date: Mon, 13 Oct 2025 12:31:42 +0200 Subject: [PATCH 5/5] fix: Fix queue test Adjust backoff for testing purposes. --- internal/pusher/v2/queue_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/pusher/v2/queue_test.go b/internal/pusher/v2/queue_test.go index 11368f41c..49d207614 100644 --- a/internal/pusher/v2/queue_test.go +++ b/internal/pusher/v2/queue_test.go @@ -190,6 +190,11 @@ func TestQueuePush(t *testing.T) { _, _ = w.Write([]byte(body)) } } + + testOptions := defaultPusherOptions + testOptions.minBackoff = 10 * time.Millisecond + testOptions.maxBackoff = 50 * time.Millisecond + for title, tc := range map[string]struct { options *pusherOptions responses []http.HandlerFunc @@ -240,7 +245,7 @@ func TestQueuePush(t *testing.T) { }, "max retries": { options: func() *pusherOptions { - opt := defaultPusherOptions + opt := testOptions opt.maxRetries = 3 return &opt }(), @@ -293,7 +298,7 @@ func TestQueuePush(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) defer cancel() - opt := defaultPusherOptions + opt := testOptions if tc.options != nil { opt = *tc.options }