diff --git a/kafka_exporter.go b/kafka_exporter.go index ab205441..4a1f7c8c 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -657,22 +657,27 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) e.mu.Lock() - if offset, ok := offset[topic][partition]; ok { - // If the topic is consumed by that consumer group, but no offset associated with the partition - // forcing lag to -1 to be able to alert on that + currentPartitionOffset, currentPartitionOffsetError := e.client.GetOffset(topic, partition, sarama.OffsetNewest) + if currentPartitionOffsetError != nil { + klog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, currentPartitionOffsetError) + } else { var lag int64 if offsetFetchResponseBlock.Offset == -1 { lag = -1 } else { - lag = offset - offsetFetchResponseBlock.Offset + if offset, ok := offset[topic][partition]; ok { + if currentPartitionOffset == -1 { + currentPartitionOffset = offset + } + } + lag = currentPartitionOffset - offsetFetchResponseBlock.Offset lagSum += lag } + ch <- prometheus.MustNewConstMetric( consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) - } else { - klog.Errorf("No offset of topic %s partition %d, cannot get consumer group lag", topic, partition) - } + } e.mu.Unlock() } ch <- prometheus.MustNewConstMetric(