Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions internal/pusher/v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ var (

// Max bytes to hold queued
// 0: Disabled
maxQueuedBytes: 128 * 1024,

// Max items (check results) to hold in memory (per tenant per type)
// 0: Disabled
maxQueuedItems: 128,
maxQueuedBytes: 1024 * 1024,

// Max time to keep an item in the queue before it's discarded
// Note that loki/mimir will probably reject data older than 1h anyway.
Expand All @@ -37,11 +33,11 @@ var (

// Max number of retries in case of network(retriable) error.
// Ideally we make this big and have data expired with the above limits.
maxRetries: 20,
maxRetries: 50,

// Backoff between retries. Doubling at each attempt.
minBackoff: time.Millisecond * 30,
maxBackoff: time.Second * 2,
minBackoff: time.Millisecond * 50,
maxBackoff: time.Second * 30,

// Max time a tenant pusher is active. This is useful to cause the tenant info
// to be refreshed.
Expand All @@ -53,7 +49,7 @@ var (
// How long without receiving check results until a tenant pusher is stopped.
// This is to cleanup tenants that don't have active checks anymore.
// Set it to a value higher than the max interval between a single check run.
maxIdleTime: 5 * time.Minute,
maxIdleTime: 65 * time.Minute,

// How long to wait before refreshing tenant due to an error.
tenantDelay: 10 * time.Second,
Expand All @@ -78,7 +74,6 @@ var (
type pusherOptions struct {
maxPushBytes uint64 // Max bytes to send on a single push request
maxQueuedBytes uint64 // Max bytes to hold queued
maxQueuedItems int // Max items (check results) to hold in memory
maxQueuedTime time.Duration // Max time an item can be queued until it expires
maxRetries int // Max retries for a push
minBackoff time.Duration
Expand Down
15 changes: 1 addition & 14 deletions internal/pusher/v2/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,12 @@ func (q *queue) insert(data *[]byte) {
}

func (q *queue) applyLimits() {
numDropped := q.limitNumItems(q.options.maxQueuedItems) + q.limitBytes(q.options.maxQueuedBytes) + q.limitAge(q.options.maxQueuedTime)
numDropped := q.limitBytes(q.options.maxQueuedBytes) + q.limitAge(q.options.maxQueuedTime)
if numDropped > 0 {
q.options.metrics.DroppedCounter.WithLabelValues().Add(float64(numDropped))
}
}

func (q *queue) limitNumItems(max int) (numRemoved int) {
var (
n = len(q.data)
excess = n - max
)
if max <= 0 || excess <= 0 {
return 0
}
q.options.pool.returnAll(q.data[:excess])
q.data = q.data[excess:]
return excess
}

func (q *queue) limitBytes(max uint64) (numRemoved int) {
n := len(q.data)
if max <= 0 {
Expand Down
46 changes: 7 additions & 39 deletions internal/pusher/v2/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func TestQueue(t *testing.T) {
defaultOptions := pusherOptions{
maxPushBytes: 1024 * 1024,
maxQueuedBytes: 0,
maxQueuedItems: 0,
maxQueuedTime: 0,
}

Expand Down Expand Up @@ -136,42 +135,6 @@ func TestQueue(t *testing.T) {
},
countDropped: 2,
},
"max queued items": {
options: &pusherOptions{
maxPushBytes: 1024,
maxQueuedItems: 2,
},
actions: []testAction{
insert(9),
insert(11),
expect(timeout, []int{9, 11}),
insert(10),
insert(11),
insert(15),
insert(5),
insert(1),
expect(timeout, []int{5, 1}),
expectEmpty(),
},
countDropped: 3,
},
"max queued items return last": {
options: &pusherOptions{
maxPushBytes: 1024,
maxQueuedItems: 3,
},
actions: []testAction{
insert(1),
insert(2),
expect(timeout, []int{1, 2}),
insert(3),
insert(4),
returnLast(),
expect(timeout, []int{2, 3, 4}),
expectEmpty(),
},
countDropped: 1,
},
"max queued time": {
options: &pusherOptions{
maxQueuedTime: 100 * time.Millisecond,
Expand Down Expand Up @@ -227,6 +190,11 @@ func TestQueuePush(t *testing.T) {
_, _ = w.Write([]byte(body))
}
}

testOptions := defaultPusherOptions
testOptions.minBackoff = 10 * time.Millisecond
testOptions.maxBackoff = 50 * time.Millisecond

for title, tc := range map[string]struct {
options *pusherOptions
responses []http.HandlerFunc
Expand Down Expand Up @@ -277,7 +245,7 @@ func TestQueuePush(t *testing.T) {
},
"max retries": {
options: func() *pusherOptions {
opt := defaultPusherOptions
opt := testOptions
opt.maxRetries = 3
return &opt
}(),
Expand Down Expand Up @@ -330,7 +298,7 @@ func TestQueuePush(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()

opt := defaultPusherOptions
opt := testOptions
if tc.options != nil {
opt = *tc.options
}
Expand Down
Loading