Skip to content

Commit 74653ac

Browse files
authored
fix: no longer drop request if stream is dropped in loki.source.api (#4834)
* fix: add ForceShutdown that will cancel in-flight requests before stopping server * Split into multiple files and add LogsBatchReceiver * don't drop request when relabel rules drops a specific stream * fix: use loki.LogsBatchReceiver to ensure all entries in a request is sent down the pipeline * add changelog * add checks for entries and use sync once to close channel
1 parent 72b9c94 commit 74653ac

File tree

11 files changed

+252
-152
lines changed

11 files changed

+252
-152
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ Main (unreleased)
1414

1515
- update promtail converter to use `file_match` block for `loki.source.file` instead of going through `local.file_match`. (@kalleep)
1616

17+
### Bugfixes
18+
19+
- `loki.source.api` no longer drops request when relabel rules drops a specific stream. (@kalleep)
20+
1721
v1.12.0-rc.0
1822
-----------------
1923

docs/sources/reference/components/loki/loki.source.api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ The metrics include labels such as `status_code` where relevant, which can be us
105105
* `loki_source_api_request_message_bytes` (histogram): Size (in bytes) of messages received in the request.
106106
* `loki_source_api_response_message_bytes` (histogram): Size (in bytes) of messages sent in response.
107107
* `loki_source_api_tcp_connections` (gauge): Current number of accepted TCP connections.
108+
* `loki_source_api_entries_written` (counter): Total number of log entries forwarded.
108109

109110
## Example
110111

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package loki
2+
3+
import (
4+
"github.com/grafana/loki/pkg/push"
5+
"github.com/prometheus/common/model"
6+
)
7+
8+
// Entry is a log entry with labels.
9+
type Entry struct {
10+
Labels model.LabelSet
11+
push.Entry
12+
}
13+
14+
// Clone returns a copy of the entry so that it can be safely fanned out.
15+
func (e *Entry) Clone() Entry {
16+
return Entry{
17+
Labels: e.Labels.Clone(),
18+
Entry: e.Entry,
19+
}
20+
}

internal/component/common/loki/types.go renamed to internal/component/common/loki/entry_handler.go

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"sync"
1010
"time"
1111

12-
"github.com/grafana/loki/pkg/push"
1312
"github.com/prometheus/common/model"
1413
)
1514

@@ -21,68 +20,6 @@ import (
2120
// to an outage or erroring (such as limits being hit).
2221
const finalEntryTimeout = 5 * time.Second
2322

24-
// LogReceiverOption is an option argument passed to NewLogsReceiver.
25-
type LogReceiverOption func(*logsReceiver)
26-
27-
func WithChannel(c chan Entry) LogReceiverOption {
28-
return func(l *logsReceiver) {
29-
l.entries = c
30-
}
31-
}
32-
33-
func WithComponentID(id string) LogReceiverOption {
34-
return func(l *logsReceiver) {
35-
l.componentID = id
36-
}
37-
}
38-
39-
// LogsReceiver is an interface providing `chan Entry` which is used for component
40-
// communication.
41-
type LogsReceiver interface {
42-
Chan() chan Entry
43-
}
44-
45-
type logsReceiver struct {
46-
entries chan Entry
47-
componentID string
48-
}
49-
50-
func (l *logsReceiver) Chan() chan Entry {
51-
return l.entries
52-
}
53-
54-
func (l *logsReceiver) String() string {
55-
return l.componentID + ".receiver"
56-
}
57-
58-
func NewLogsReceiver(opts ...LogReceiverOption) LogsReceiver {
59-
l := &logsReceiver{}
60-
61-
for _, o := range opts {
62-
o(l)
63-
}
64-
65-
if l.entries == nil {
66-
l.entries = make(chan Entry)
67-
}
68-
69-
return l
70-
}
71-
72-
// Entry is a log entry with labels.
73-
type Entry struct {
74-
Labels model.LabelSet
75-
push.Entry
76-
}
77-
78-
// Clone returns a copy of the entry so that it can be safely fanned out.
79-
func (e *Entry) Clone() Entry {
80-
return Entry{
81-
Labels: e.Labels.Clone(),
82-
Entry: e.Entry,
83-
}
84-
}
85-
8623
// EntryHandler is something that can "handle" entries via a channel.
8724
// Stop must be called to gracefully shut down the EntryHandler
8825
type EntryHandler interface {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package loki
2+
3+
// LogReceiverOption is an option argument passed to NewLogsReceiver.
4+
type LogReceiverOption func(*logsReceiver)
5+
6+
func WithChannel(c chan Entry) LogReceiverOption {
7+
return func(l *logsReceiver) {
8+
l.entries = c
9+
}
10+
}
11+
12+
func WithComponentID(id string) LogReceiverOption {
13+
return func(l *logsReceiver) {
14+
l.componentID = id
15+
}
16+
}
17+
18+
// LogsReceiver is an interface providing `chan Entry` which is used for component
19+
// communication.
20+
type LogsReceiver interface {
21+
Chan() chan Entry
22+
}
23+
24+
type logsReceiver struct {
25+
entries chan Entry
26+
componentID string
27+
}
28+
29+
func (l *logsReceiver) Chan() chan Entry {
30+
return l.entries
31+
}
32+
33+
func (l *logsReceiver) String() string {
34+
return l.componentID + ".receiver"
35+
}
36+
37+
func NewLogsReceiver(opts ...LogReceiverOption) LogsReceiver {
38+
l := &logsReceiver{}
39+
40+
for _, o := range opts {
41+
o(l)
42+
}
43+
44+
if l.entries == nil {
45+
l.entries = make(chan Entry)
46+
}
47+
48+
return l
49+
}
50+
51+
// LogsBatchReceiver is an interface providing `chan []Entry`. This should be used when
52+
// multiple entries need to be sent over a channel.
53+
type LogsBatchReceiver interface {
54+
Chan() chan []Entry
55+
}
56+
57+
func NewLogsBatchReceiver() LogsBatchReceiver {
58+
return &logsBatchReceiver{c: make(chan []Entry)}
59+
}
60+
61+
type logsBatchReceiver struct {
62+
c chan []Entry
63+
}
64+
65+
func (l *logsBatchReceiver) Chan() chan []Entry {
66+
return l.c
67+
}

internal/component/loki/source/api/api.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (a *Arguments) labelSet() model.LabelSet {
5757

5858
type Component struct {
5959
opts component.Options
60-
handler loki.LogsReceiver
60+
handler loki.LogsBatchReceiver
6161
uncheckedCollector *util.UncheckedCollector
6262

6363
serverMut sync.Mutex
@@ -72,7 +72,7 @@ type Component struct {
7272
func New(opts component.Options, args Arguments) (*Component, error) {
7373
c := &Component{
7474
opts: opts,
75-
handler: loki.NewLogsReceiver(),
75+
handler: loki.NewLogsBatchReceiver(),
7676
receivers: args.ForwardTo,
7777
uncheckedCollector: util.NewUncheckedCollector(nil),
7878
}
@@ -86,23 +86,30 @@ func New(opts component.Options, args Arguments) (*Component, error) {
8686

8787
func (c *Component) Run(ctx context.Context) (err error) {
8888
defer func() {
89-
c.stop()
89+
c.serverMut.Lock()
90+
defer c.serverMut.Unlock()
91+
if c.server != nil {
92+
// We want to cancel all in-flight request when component stops.
93+
c.server.ForceShutdown()
94+
c.server = nil
95+
}
9096
}()
9197

9298
for {
9399
select {
94-
case entry := <-c.handler.Chan():
100+
case entries := <-c.handler.Chan():
95101
c.receiversMut.RLock()
96-
receivers := c.receivers
97-
c.receiversMut.RUnlock()
98-
99-
for _, receiver := range receivers {
100-
select {
101-
case receiver.Chan() <- entry:
102-
case <-ctx.Done():
103-
return
102+
for _, entry := range entries {
103+
for _, receiver := range c.receivers {
104+
select {
105+
case receiver.Chan() <- entry:
106+
case <-ctx.Done():
107+
c.receiversMut.RUnlock()
108+
return
109+
}
104110
}
105111
}
112+
c.receiversMut.RUnlock()
106113
case <-ctx.Done():
107114
return
108115
}
@@ -164,12 +171,3 @@ func (c *Component) Update(args component.Arguments) error {
164171

165172
return nil
166173
}
167-
168-
func (c *Component) stop() {
169-
c.serverMut.Lock()
170-
defer c.serverMut.Unlock()
171-
if c.server != nil {
172-
c.server.Shutdown()
173-
c.server = nil
174-
}
175-
}

internal/component/loki/source/api/api_test.go

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ func TestLokiSourceAPI_Simple(t *testing.T) {
105105
a.UseIncomingTimestamp = true
106106
})
107107
opts := defaultOptions()
108-
_, shutdown := startTestComponent(t, opts, args, ctx)
109-
defer shutdown()
108+
_ = startTestComponent(t, opts, args, ctx)
110109

111110
lokiClient := newTestLokiClient(t, args, opts)
112111
defer lokiClient.Stop()
@@ -152,8 +151,7 @@ func TestLokiSourceAPI_Update(t *testing.T) {
152151
a.Labels = map[string]string{"test_label": "before"}
153152
})
154153
opts := defaultOptions()
155-
c, shutdown := startTestComponent(t, opts, args, ctx)
156-
defer shutdown()
154+
c := startTestComponent(t, opts, args, ctx)
157155

158156
lokiClient := newTestLokiClient(t, args, opts)
159157
defer lokiClient.Stop()
@@ -219,7 +217,7 @@ func TestLokiSourceAPI_FanOut(t *testing.T) {
219217

220218
const receiversCount = 10
221219
var receivers = make([]*fake.Client, receiversCount)
222-
for i := 0; i < receiversCount; i++ {
220+
for i := range receiversCount {
223221
receivers[i] = fake.NewClient(func() {})
224222
}
225223

@@ -236,8 +234,6 @@ func TestLokiSourceAPI_FanOut(t *testing.T) {
236234
require.NoError(t, err)
237235
}()
238236

239-
defer comp.stop()
240-
241237
lokiClient := newTestLokiClient(t, args, opts)
242238
defer lokiClient.Stop()
243239

@@ -344,25 +340,19 @@ func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) {
344340
}
345341
for _, tc := range tests {
346342
t.Run(tc.name, func(t *testing.T) {
347-
comp, err := New(
348-
defaultOptions(),
349-
tc.args,
350-
)
351-
require.NoError(t, err)
343+
ctx, cancel := context.WithCancel(t.Context())
344+
defer cancel()
352345

353-
// in order to cleanly update, we want to make sure the server is running first.
354-
waitForServerToBeReady(t, comp)
346+
comp := startTestComponent(t, defaultOptions(), tc.args, ctx)
355347

356348
serverBefore := comp.server
357-
err = comp.Update(tc.newArgs)
358-
require.NoError(t, err)
349+
require.NoError(t, comp.Update(tc.newArgs))
359350

360351
restarted := serverBefore != comp.server
361352
assert.Equal(t, restarted, tc.restartRequired)
362353

363354
// in order to cleanly shutdown, we want to make sure the server is running first.
364355
waitForServerToBeReady(t, comp)
365-
comp.stop()
366356
})
367357
}
368358
}
@@ -388,8 +378,7 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
388378
a.UseIncomingTimestamp = true
389379
})
390380
opts := defaultOptions()
391-
_, shutdown := startTestComponent(t, opts, args, ctx)
392-
defer shutdown()
381+
_ = startTestComponent(t, opts, args, ctx)
393382

394383
// Create TLS-enabled Loki client
395384
lokiClient := newTestLokiClientTLS(t, args, opts)
@@ -457,6 +446,13 @@ func TestDefaultServerConfig(t *testing.T) {
457446
defaultOptions(),
458447
args,
459448
)
449+
450+
ctx := t.Context()
451+
go func() {
452+
err := comp.Run(ctx)
453+
require.NoError(t, err)
454+
}()
455+
460456
require.NoError(t, err)
461457

462458
require.Eventuallyf(t, func() bool {
@@ -467,16 +463,14 @@ func TestDefaultServerConfig(t *testing.T) {
467463
))
468464
return err == nil && resp.StatusCode == 404
469465
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
470-
471-
comp.stop()
472466
}
473467

474468
func startTestComponent(
475469
t *testing.T,
476470
opts component.Options,
477471
args Arguments,
478472
ctx context.Context,
479-
) (component.Component, func()) {
473+
) *Component {
480474

481475
comp, err := New(opts, args)
482476
require.NoError(t, err)
@@ -485,11 +479,8 @@ func startTestComponent(
485479
require.NoError(t, err)
486480
}()
487481

488-
return comp, func() {
489-
// in order to cleanly shutdown, we want to make sure the server is running first.
490-
waitForServerToBeReady(t, comp)
491-
comp.stop()
492-
}
482+
waitForServerToBeReady(t, comp)
483+
return comp
493484
}
494485

495486
func TestShutdown(t *testing.T) {

0 commit comments

Comments
 (0)