diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go index 44ce4b85fb5ae..e800d30e6c633 100644 --- a/pkg/dataobj/consumer/service.go +++ b/pkg/dataobj/consumer/service.go @@ -127,6 +127,7 @@ func New(kafkaCfg kafka.Config, cfg Config, mCfg metastore.Config, bucket objsto processorFactory.New, logger, prometheus.WrapRegistererWithPrefix("loki_dataobj_consumer_", reg), + partitionInstanceLifecycler, ) if err != nil { return nil, err diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6cc74132c453d..e24e1db47f65f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -392,6 +392,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con NewKafkaConsumerFactory(i, registerer, cfg.KafkaIngestion.KafkaConfig.MaxConsumerWorkers), logger, registerer, + i.partitionRingLifecycler, partition.WithHeaderToContextExtractor(validation.IngestionPoliciesKafkaHeadersToContext), ) if err != nil { diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index cb3bcb7bd6f58..6943939a97826 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sync" "time" "github.com/go-kit/log" @@ -43,6 +44,9 @@ type Reader interface { // SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader. // For example, we can use this to differentiate between the startup phase and the running phase. SetPhase(phase string) + // SetPartitionState sets the partition ring state for the reader. This is used to track the partition's + // state in the ring (pending, active, inactive) for metrics labeling purposes. + SetPartitionState(state string) } // ReaderMetrics contains metrics specific to Kafka reading operations @@ -65,7 +69,7 @@ func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics { NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), - }, []string{"phase"}), + }, []string{"phase", "partition_state"}), fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Namespace: client.MetricsPrefix, Name: "partition_reader_fetch_wait_duration_seconds", @@ -99,6 +103,8 @@ type KafkaReader struct { consumerGroup string metrics *ReaderMetrics phase string + partitionStateMu sync.RWMutex + partitionState string logger log.Logger headerToContextExtractor func(context.Context, []kgo.RecordHeader) context.Context } @@ -128,11 +134,12 @@ func NewKafkaReader( } reader := &KafkaReader{ - client: c, - topic: cfg.Topic, - partitionID: partitionID, - metrics: metrics, - logger: logger, + client: c, + topic: cfg.Topic, + partitionID: partitionID, + metrics: metrics, + logger: logger, + partitionState: "unknown", } // Apply functional options @@ -159,6 +166,13 @@ func (r *KafkaReader) SetPhase(phase string) { r.phase = phase } +// SetPartitionState sets the partition ring state for the reader. +func (r *KafkaReader) SetPartitionState(state string) { + r.partitionStateMu.Lock() + defer r.partitionStateMu.Unlock() + r.partitionState = state +} + // Poll retrieves the next batch of records from Kafka // Number of records fetched can be limited by configuring maxPollRecords to a non-zero value. func (r *KafkaReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, error) { @@ -166,15 +180,26 @@ func (r *KafkaReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, e fetches := r.client.PollRecords(ctx, maxPollRecords) r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) + // Capture current partition state once for consistent metric labeling + r.partitionStateMu.RLock() + partitionState := r.partitionState + r.partitionStateMu.RUnlock() + // Record metrics r.metrics.fetchesTotal.Add(float64(len(fetches))) var numRecords int fetches.EachRecord(func(record *kgo.Record) { numRecords++ - r.metrics.consumptionLag.WithLabelValues(r.phase).Observe(time.Since(record.Timestamp).Seconds()) + r.metrics.consumptionLag.WithLabelValues(r.phase, partitionState).Observe(time.Since(record.Timestamp).Seconds()) }) r.metrics.recordsPerFetch.Observe(float64(numRecords)) + // If no records were fetched, observe lag as 0 (caught up) + // This ensures inactive partitions continue reporting metrics + if numRecords == 0 { + r.metrics.consumptionLag.WithLabelValues(r.phase, partitionState).Observe(0) + } + // Handle errors var errs multierror.MultiError fetches.EachError(func(topic string, partition int32, err error) { diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index 4862a21616575..b112b32df92f4 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -22,6 +23,11 @@ const ( phaseRunning = "running" ) +// StateProvider provides access to partition ring state. +type StateProvider interface { + GetPartitionState(ctx context.Context) (ring.PartitionState, time.Time, error) +} + type ConsumerFactory func(committer Committer, logger log.Logger) (Consumer, error) type Consumer interface { @@ -59,6 +65,7 @@ type ReaderService struct { metrics *serviceMetrics committer *partitionCommitter partitionID int32 + stateProvider StateProvider lastProcessedOffset int64 } @@ -77,6 +84,7 @@ func NewReaderService( consumerFactory ConsumerFactory, logger log.Logger, reg prometheus.Registerer, + stateProvider StateProvider, readerOpts ...ReaderOption, ) (*ReaderService, error) { readerMetrics := NewReaderMetrics(reg) @@ -106,6 +114,7 @@ func NewReaderService( consumerFactory, logger, reg, + stateProvider, ), nil } @@ -117,6 +126,7 @@ func newReaderService( consumerFactory ConsumerFactory, logger log.Logger, reg prometheus.Registerer, + stateProvider StateProvider, ) *ReaderService { s := &ReaderService{ cfg: cfg, @@ -127,6 +137,7 @@ func newReaderService( logger: log.With(logger, "partition", partitionID, "consumer_group", offsetManager.ConsumerGroup()), metrics: newServiceMetrics(reg), lastProcessedOffset: int64(KafkaEndOffset), + stateProvider: stateProvider, } // Create the committer @@ -175,14 +186,21 @@ func (s *ReaderService) running(ctx context.Context) error { s.metrics.reportRunning() s.reader.SetPhase(phaseRunning) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Set initial partition state synchronously before starting polling + if s.stateProvider != nil { + s.updatePartitionState(ctx) + // Start monitoring partition state changes in background + go s.monitorPartitionState(ctx) + } + consumer, err := s.consumerFactory(s.committer, log.With(s.logger, "phase", phaseRunning)) if err != nil { return fmt.Errorf("creating consumer: %w", err) } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - recordsChan := s.startFetchLoop(ctx) wait := consumer.Start(ctx, recordsChan) wait() @@ -373,3 +391,52 @@ func loggerWithCurrentLagIfSet(logger log.Logger, currentLag time.Duration) log. return log.With(logger, "current_lag", currentLag) } + +func (s *ReaderService) monitorPartitionState(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.updatePartitionState(ctx) + } + } +} + +func (s *ReaderService) updatePartitionState(ctx context.Context) { + state, _, err := s.stateProvider.GetPartitionState(ctx) + if err != nil { + level.Warn(s.logger).Log("msg", "failed to get partition state", "err", err) + s.reader.SetPartitionState("unknown") + return + } + + var stateStr string + switch state { + case ring.PartitionActive: + stateStr = "active" + case ring.PartitionInactive: + stateStr = "inactive" + case ring.PartitionPending: + stateStr = "pending" + default: + stateStr = "unknown" + } + + // Get current state to detect changes + currentReader, ok := s.reader.(*KafkaReader) + if ok { + currentReader.partitionStateMu.RLock() + oldState := currentReader.partitionState + currentReader.partitionStateMu.RUnlock() + + if oldState != stateStr { + level.Info(s.logger).Log("msg", "partition state changed", "old_state", oldState, "new_state", stateStr) + } + } + + s.reader.SetPartitionState(stateStr) +} diff --git a/pkg/kafka/partition/reader_test.go b/pkg/kafka/partition/reader_test.go index e8d425cba1ead..6e7388677dc48 100644 --- a/pkg/kafka/partition/reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -73,6 +73,7 @@ func readersFromKafkaCfg( consumerFactory, log.NewNopLogger(), nil, + nil, // stateProvider is optional in tests ) require.NoError(t, err)