Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,16 @@ func TestFrontend_Protobuf_HappyPath(t *testing.T) {

msg, err := resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf := msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, expectedMessages[0], msg)

msg, err = resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf = msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, expectedMessages[1], msg)

// Response stream exhausted.
Expand Down Expand Up @@ -397,12 +401,16 @@ func TestFrontend_Protobuf_QuerierResponseReceivedBeforeSchedulerResponse(t *tes

msg, err := resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf := msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, expectedMessages[0], msg)

msg, err = resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf = msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, expectedMessages[1], msg)

// Response stream exhausted.
Expand Down Expand Up @@ -443,7 +451,9 @@ func TestFrontend_Protobuf_ResponseClosedBeforeStreamExhausted(t *testing.T) {

msg, err := resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf := msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, expectedMessages[0], msg)
resp.Close() // We expect all goroutines to be cleaned up after this (verified by the VerifyNoLeakTestMain call in TestMain above)
}
Expand Down Expand Up @@ -680,7 +690,9 @@ func TestFrontend_Protobuf_RetryEnqueue(t *testing.T) {

msg, err := resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf := msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, expectedMessages[0], msg)
}

Expand Down Expand Up @@ -729,12 +741,16 @@ func TestFrontend_Protobuf_ReadingResponseAfterAllMessagesReceived(t *testing.T)

msg, err := resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf := msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, expectedMessages[0], msg)

msg, err = resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf = msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, expectedMessages[1], msg)

// Wait until the last message has been buffered into the stream channel and the stream's context has been cancelled by DoProtobufRequest.
Expand All @@ -747,7 +763,9 @@ func TestFrontend_Protobuf_ReadingResponseAfterAllMessagesReceived(t *testing.T)

msg, err = resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf = msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, expectedMessages[2], msg, "should still be able to read last message after stream has been completely read")

msg, err = resp.Next(ctx)
Expand Down Expand Up @@ -1302,12 +1320,16 @@ func TestFrontend_Protobuf_ResponseSentTwice(t *testing.T) {

msg, err := resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf := msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, firstMessage, msg)

msg, err = resp.Next(ctx)
require.NoError(t, err)
msg.FreeBuffer() // We don't care about the contents of the buffer in the assertion below.
buf = msg.Buffer()
defer buf.Free()
msg.SetBuffer(nil) // We don't care about the contents of the buffer in the assertion below.
require.Equal(t, secondMessage, msg)

// Response stream exhausted.
Expand Down
32 changes: 31 additions & 1 deletion pkg/mimirpb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,17 @@ type BufferHolder struct {
buffer mem.Buffer
}

func (m *BufferHolder) Buffer() mem.Buffer {
return m.buffer
}

func (m *BufferHolder) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *BufferHolder) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

Expand Down Expand Up @@ -534,3 +537,30 @@ type orderAwareMetricMetadata struct {
// order is the 0-based index of this metadata object in a wider metadata array.
order int
}

func (m *WriteRequest) FreeBuffer() {
m.BufferHolder.FreeBuffer()
for p := range m.sourceBufferHolders {
p.FreeBuffer()
}
}

// AddSourceBufferHolder adds a source BufferHolder to the WriteRequest,
// retaining a strong reference to the source buffer. See
// [WriteRequest.SourceBufferHolders].
func (m *WriteRequest) AddSourceBufferHolder(bufh *BufferHolder) {
buf := bufh.Buffer()
if buf == nil {
return
}
if _, ok := m.sourceBufferHolders[bufh]; ok {
return
}

buf.Ref()

if m.sourceBufferHolders == nil {
m.sourceBufferHolders = map[*BufferHolder]struct{}{}
}
m.sourceBufferHolders[bufh] = struct{}{}
}
5 changes: 5 additions & 0 deletions pkg/mimirpb/mimir.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading