diff --git a/CHANGELOG.md b/CHANGELOG.md index 252816a7db..6127bd3cd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 +* [BUGFIX] Distributor: Fix panic on health check failure when using stream push. #7116 ## 1.20.0 2025-11-10 diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index ed8bacd45a..eb0ac91ec0 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -231,7 +231,10 @@ func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error { select { case <-ctx.Done(): return - case job := <-c.streamPushChan: + case job, ok := <-c.streamPushChan: + if !ok { + return + } err = stream.Send(job.req) if err == io.EOF { job.resp = &cortexpb.WriteResponse{} diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 02edc8d070..4f8316147a 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -115,12 +116,18 @@ func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequ type mockIngester struct { IngesterClient + mock.Mock } func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) { return &cortexpb.WriteResponse{}, nil } +func (m *mockIngester) PushStream(ctx context.Context, opts ...grpc.CallOption) (Ingester_PushStreamClient, error) { + args := m.Called(ctx, opts) + return args.Get(0).(Ingester_PushStreamClient), nil +} + type mockClientConn struct { ClosableClientConn } @@ -227,3 +234,40 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) { assert.True(t, job1Cancelled, "job1 should have been cancelled") assert.True(t, job2Cancelled, "job2 should have been cancelled") } + +type mockClientStream struct { + mock.Mock + grpc.ClientStream +} + +func (m *mockClientStream) Send(msg *cortexpb.StreamWriteRequest) error { + args := m.Called(msg) + return args.Error(0) +} + +func (m *mockClientStream) Recv() (*cortexpb.WriteResponse, error) { + return &cortexpb.WriteResponse{}, nil +} + +func TestClosableHealthAndIngesterClient_ShouldNotPanicWhenClose(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + streamChan := make(chan *streamWriteJob) + + mockIngester := &mockIngester{} + mockStream := &mockClientStream{} + mockIngester.On("PushStream", mock.Anything, mock.Anything).Return(mockStream, nil).Once() + + client := &closableHealthAndIngesterClient{ + IngesterClient: mockIngester, + conn: &mockClientConn{}, + addr: "test-addr", + inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}), + streamCtx: ctx, + streamCancel: cancel, + streamPushChan: streamChan, + } + require.NoError(t, client.worker(context.Background())) + require.NoError(t, client.Close()) + + time.Sleep(100 * time.Millisecond) +}