Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
* [ENHANCEMENT] Add experimental flag `common.instrument-reference-leaks-percentage` to leaked references to gRPC buffers. #13609
* [ENHANCEMENT] Ingester: Reduce likelihood of ingestion being paused while idle TSDB compaction is in progress. #13978
* [ENHANCEMENT] Ingester: Extend `cortex_ingester_tsdb_forced_compactions_in_progress` metric to report a value of 1 when there's an idle or forced TSDB head compaction in progress. #13979
* [ENHANCEMENT] MQE: Include metric name in `histogram_quantile` warning/info annotations when delayed name removal is enabled. #13905
* [BUGFIX] Distributor: Fix issue where distributors didn't send custom values of native histograms. #13849
* [BUGFIX] Compactor: Fix potential concurrent map writes. #13053
* [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084
Expand Down
162 changes: 107 additions & 55 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,17 @@ func TestOurTestCases(t *testing.T) {

testScript := string(b)

t.Run("Mimir's engine", func(t *testing.T) {
if strings.Contains(testFile, "name_label_dropping") {
promqltest.RunTest(t, testScript, mimirEngineWithDelayedNameRemoval)
return
}
mimirEngineToTest := mimirEngine
prometheusEngineToTest := prometheusEngine

// switch to the alternate engines if we need delayed name removal
if strings.Contains(testFile, "name_label_dropping") || strings.Contains(testFile, "delayed_name_removal_enabled") {
mimirEngineToTest = mimirEngineWithDelayedNameRemoval
prometheusEngineToTest = prometheusEngineWithDelayedNameRemoval
}

promqltest.RunTest(t, testScript, mimirEngine)
t.Run("Mimir's engine", func(t *testing.T) {
promqltest.RunTest(t, testScript, mimirEngineToTest)
})

// Run the tests against Prometheus' engine to ensure our test cases are valid.
Expand All @@ -264,12 +268,7 @@ func TestOurTestCases(t *testing.T) {
t.Skip("disabled for Prometheus' engine due to bug in Prometheus' engine")
}

if strings.Contains(testFile, "name_label_dropping") {
promqltest.RunTest(t, testScript, prometheusEngineWithDelayedNameRemoval)
return
}

promqltest.RunTest(t, testScript, prometheusEngine)
promqltest.RunTest(t, testScript, prometheusEngineToTest)
})
})
}
Expand Down Expand Up @@ -2204,32 +2203,69 @@ func (t *timeoutTestingQueryTracker) Close() error {
}

type annotationTestCase struct {
data string
expr string
expectedWarningAnnotations []string
expectedInfoAnnotations []string
data string
expr string
expectedWarningAnnotations []string
expectedInfoAnnotations []string

// an alternate set of annotations for when delayed name removal is enabled.
// if not set the test will fall back to expectedWarningAnnotations / expectedInfoAnnotations
expectedWarningAnnotationsDelayedNameRemovalEnabled []string
expectedInfoAnnotationsDelayedNameRemovalEnabled []string

skipComparisonWithPrometheusReason string
instantEvaluationTimestamp *time.Time
}

func (a annotationTestCase) getExpectedInfoAnnotations(delayedNameRemovalEnabled bool) []string {
if delayedNameRemovalEnabled && a.expectedInfoAnnotationsDelayedNameRemovalEnabled != nil {
return a.expectedInfoAnnotationsDelayedNameRemovalEnabled
}
return a.expectedInfoAnnotations
}

func (a annotationTestCase) getExpectedWarningAnnotations(delayedNameRemovalEnabled bool) []string {
if delayedNameRemovalEnabled && a.expectedWarningAnnotationsDelayedNameRemovalEnabled != nil {
return a.expectedWarningAnnotationsDelayedNameRemovalEnabled
}
return a.expectedWarningAnnotations
}

func runAnnotationTests(t *testing.T, testCases map[string]annotationTestCase) {
startT := timestamp.Time(0).Add(time.Minute)
step := time.Minute
endT := startT.Add(2 * step)

opts := NewTestEngineOpts()
planner, err := NewQueryPlanner(opts, NewMaximumSupportedVersionQueryPlanVersionProvider())
require.NoError(t, err)
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner)
require.NoError(t, err)
prometheusEngine := promql.NewEngine(opts.CommonOpts)

const prometheusEngineName = "Prometheus' engine"
engines := map[string]promql.QueryEngine{
"Mimir's engine": mimirEngine,
const mimirEngineName = "Mimir's engine"

// create 2 sets of engines - a Mimir and Prometheus engine each with EnableDelayedNameRemoval=true and other with EnableDelayedNameRemoval=false
// there are some histogram annotation test cases which will emit a different warning/info annotation string depending on the delayed name removal setting
engineSets := make([]struct {
mimirEngine promql.QueryEngine
prometheusEngine promql.QueryEngine
delayedNameRemovalEnabled bool
}, 0, 2)

for _, delayedNameRemovalEnabled := range []bool{true, false} {
opts := NewTestEngineOpts()
opts.CommonOpts.EnableDelayedNameRemoval = delayedNameRemovalEnabled

// Compare against Prometheus' engine to verify our test cases are valid.
prometheusEngineName: prometheusEngine,
planner, err := NewQueryPlanner(opts, NewMaximumSupportedVersionQueryPlanVersionProvider())
require.NoError(t, err)
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner)
require.NoError(t, err)
prometheusEngine := promql.NewEngine(opts.CommonOpts)

engineSets = append(engineSets, struct {
mimirEngine promql.QueryEngine
prometheusEngine promql.QueryEngine
delayedNameRemovalEnabled bool
}{
mimirEngine: mimirEngine,
prometheusEngine: prometheusEngine,
delayedNameRemovalEnabled: delayedNameRemovalEnabled,
})
}

for name, testCase := range testCases {
Expand All @@ -2253,36 +2289,43 @@ func runAnnotationTests(t *testing.T, testCases map[string]annotationTestCase) {
}

for queryType, generator := range queryTypes {
t.Run(queryType, func(t *testing.T) {
results := make([]*promql.Result, 0, 2)

for engineName, engine := range engines {
if engineName == prometheusEngineName && testCase.skipComparisonWithPrometheusReason != "" {
t.Logf("Skipping comparison with Prometheus' engine: %v", testCase.skipComparisonWithPrometheusReason)
continue
for _, engineSet := range engineSets {
subTestName := fmt.Sprintf("%s - delayed name removal enabled=%t", queryType, engineSet.delayedNameRemovalEnabled)
t.Run(subTestName, func(t *testing.T) {
results := make([]*promql.Result, 0, 2)

for _, engine := range []promql.QueryEngine{engineSet.mimirEngine, engineSet.prometheusEngine} {
if engine == engineSet.prometheusEngine && testCase.skipComparisonWithPrometheusReason != "" {
t.Logf("Skipping comparison with Prometheus' engine: %v", testCase.skipComparisonWithPrometheusReason)
continue
}
engineName := mimirEngineName
if engine == engineSet.prometheusEngine {
engineName = prometheusEngineName
}
t.Run(engineName, func(t *testing.T) {
query, err := generator(engine)
require.NoError(t, err)
t.Cleanup(query.Close)

res := query.Exec(context.Background())
require.NoError(t, res.Err)
results = append(results, res)

warnings, infos := res.Warnings.AsStrings(testCase.expr, 0, 0)
require.ElementsMatch(t, testCase.getExpectedWarningAnnotations(engineSet.delayedNameRemovalEnabled), warnings)
require.ElementsMatch(t, testCase.getExpectedInfoAnnotations(engineSet.delayedNameRemovalEnabled), infos)
})
}
t.Run(engineName, func(t *testing.T) {
query, err := generator(engine)
require.NoError(t, err)
t.Cleanup(query.Close)

res := query.Exec(context.Background())
require.NoError(t, res.Err)
results = append(results, res)

warnings, infos := res.Warnings.AsStrings(testCase.expr, 0, 0)
require.ElementsMatch(t, testCase.expectedWarningAnnotations, warnings)
require.ElementsMatch(t, testCase.expectedInfoAnnotations, infos)
})
}

// If both results are available, compare them (sometimes we skip prometheus)
if len(results) == 2 {
// We do this extra comparison to ensure that we don't skip a series that may be outputted during a warning
// or vice-versa where no result may be expected etc.
mqetest.RequireEqualResults(t, testCase.expr, results[0], results[1], false)
}
})
// If both results are available, compare them (sometimes we skip prometheus)
if len(results) == 2 {
// We do this extra comparison to ensure that we don't skip a series that may be outputted during a warning
// or vice-versa where no result may be expected etc.
mqetest.RequireEqualResults(t, testCase.expr, results[0], results[1], false)
}
})
}
}
})
}
Expand Down Expand Up @@ -3144,6 +3187,7 @@ func TestHistogramAnnotations(t *testing.T) {
data: mixedClassicHistograms,
expr: `histogram_quantile(0.5, series{host="c"})`,
expectedWarningAnnotations: []string{`PromQL warning: bucket label "le" is missing or has a malformed value of "abc" (1:25)`},
expectedWarningAnnotationsDelayedNameRemovalEnabled: []string{`PromQL warning: bucket label "le" is missing or has a malformed value of "abc" for metric name "series" (1:25)`},
},
"invalid quantile warning": {
data: mixedClassicHistograms,
Expand All @@ -3154,11 +3198,13 @@ func TestHistogramAnnotations(t *testing.T) {
data: mixedClassicHistograms,
expr: `histogram_quantile(0.5, series{host="a"})`,
expectedWarningAnnotations: []string{`PromQL warning: vector contains a mix of classic and native histograms (1:25)`},
expectedWarningAnnotationsDelayedNameRemovalEnabled: []string{`PromQL warning: vector contains a mix of classic and native histograms for metric name "series" (1:25)`},
},
"forced monotonicity info": {
data: mixedClassicHistograms,
expr: `histogram_quantile(0.5, series{host="d"})`,
expectedInfoAnnotations: []string{`PromQL info: input to histogram_quantile needed to be fixed for monotonicity (see https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile) (1:25)`},
expectedInfoAnnotationsDelayedNameRemovalEnabled: []string{`PromQL info: input to histogram_quantile needed to be fixed for monotonicity (see https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile) for metric name "series" (1:25)`},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realise this isn't something we can control, but I find the expression here hard to follow - the "for metric name ..." bit seems out of place after the parentheses, it fits better before the parentheses to me.

Perhaps something to change upstream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which ones do you like;

  • The histogram_quantile input for metric "series" was corrected to enforce monotonicity (1:25) - see ....
  • The histogram_quantile input for metric "series" was fixed to enforce monotonicity (1:25) - see ....
  • Corrected non-monotonic histogram_quantile input for metric "series" (1:25) - see ....
  • Fixed non-monotonic histogram_quantile input for metric "series" (1:25) - see ....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with tweaking the old structure:

input to histogram_quantile needed to be fixed for monotonicity for metric name "series" (see https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile)

},
"both mixed classic+native histogram and invalid quantile warnings": {
data: mixedClassicHistograms,
Expand All @@ -3167,6 +3213,10 @@ func TestHistogramAnnotations(t *testing.T) {
`PromQL warning: vector contains a mix of classic and native histograms (1:23)`,
`PromQL warning: quantile value should be between 0 and 1, got 9 (1:20)`,
},
expectedWarningAnnotationsDelayedNameRemovalEnabled: []string{
`PromQL warning: vector contains a mix of classic and native histograms for metric name "series" (1:23)`,
`PromQL warning: quantile value should be between 0 and 1, got 9 (1:20)`,
},
},
"forced monotonicity info is not emitted when quantile is invalid": {
data: mixedClassicHistograms,
Expand All @@ -3179,6 +3229,7 @@ func TestHistogramAnnotations(t *testing.T) {
`,
expr: `histogram_quantile(0.5, series{})`,
expectedWarningAnnotations: []string{`PromQL warning: bucket label "le" is missing or has a malformed value of "" (1:25)`},
expectedWarningAnnotationsDelayedNameRemovalEnabled: []string{`PromQL warning: bucket label "le" is missing or has a malformed value of "" for metric name "series" (1:25)`},
},
"extra entry in series without le label": {
data: `
Expand All @@ -3187,6 +3238,7 @@ func TestHistogramAnnotations(t *testing.T) {
`,
expr: `histogram_quantile(0.5, series{})`,
expectedWarningAnnotations: []string{`PromQL warning: bucket label "le" is missing or has a malformed value of "" (1:25)`},
expectedWarningAnnotationsDelayedNameRemovalEnabled: []string{`PromQL warning: bucket label "le" is missing or has a malformed value of "" for metric name "series" (1:25)`},
},
}

Expand Down
36 changes: 27 additions & 9 deletions pkg/streamingpromql/operators/functions/histogram_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ import (

const (
// intentionallyEmptyMetricName exists for annotations compatibility with prometheus.
// Now prometheus has delayed __name__ removal. Mimir doesn't yet, and we always remove __name__.
// Because of complication in implementing this in prometheus (see https://github.com/prometheus/prometheus/pull/16794),
// the name is always dropped if delayed __name__ removal is disabled.
// The name is dropped even if the function is the first one invoked on the vector selector.
// Mimir doesn't have delayed __name__ removal, so to stay close to prometheus, we never display the label in some annotations and warnings.
// This is only used for backwards compatibility when delayed __name__ removal is not enabled.
intentionallyEmptyMetricName = ""
)

Expand Down Expand Up @@ -136,6 +132,7 @@ func NewHistogramQuantileFunction(
annotations: annotations,
innerSeriesMetricNames: innerSeriesMetricNames,
innerExpressionPosition: inner.ExpressionPosition(),
enableDelayedNameRemoval: enableDelayedNameRemoval,
},
inner: inner,
memoryConsumptionTracker: memoryConsumptionTracker,
Expand Down Expand Up @@ -298,6 +295,16 @@ func (h *HistogramFunction) NextSeries(ctx context.Context) (types.InstantVector
return h.computeOutputSeriesForGroup(thisGroup)
}

// getMetricNameForSeries returns the metric name from innerSeriesMetricNames for the given series index.
// If enableDelayedNameRemoval is not enabled, this func will return "" to maintain compatibility with Prometheus.
func (h *HistogramFunction) getMetricNameForSeries(seriesIndex int) string {
if h.enableDelayedNameRemoval {
return h.innerSeriesMetricNames.GetMetricNameForSeries(seriesIndex)
} else {
return intentionallyEmptyMetricName
}
}

// accumulateUntilGroupComplete gathers all the series associated with the given bucketGroup
// As each inner series is selected, it is added into its respective groups.
// This means a group other than the one we are focused on may get completed first, but we
Expand Down Expand Up @@ -346,7 +353,7 @@ func (h *HistogramFunction) saveFloatsToGroup(fPoints []promql.FPoint, le string
if err != nil {
// The le label was invalid. Record it:
h.annotations.Add(annotations.NewBadBucketLabelWarning(
intentionallyEmptyMetricName,
h.getMetricNameForSeries(g.lastInputSeriesIdx),
le,
h.inner.ExpressionPosition(),
))
Expand Down Expand Up @@ -427,7 +434,7 @@ func (h *HistogramFunction) computeOutputSeriesForGroup(g *bucketGroup) (types.I
// At this data point, we have classic histogram buckets and a native histogram with the same name and labels.
// No value is returned, so emit an annotation and continue.
h.annotations.Add(annotations.NewMixedClassicNativeHistogramsWarning(
intentionallyEmptyMetricName, h.inner.ExpressionPosition(),
h.getMetricNameForSeries(g.lastInputSeriesIdx), h.inner.ExpressionPosition(),
))
continue
}
Expand Down Expand Up @@ -546,6 +553,7 @@ type histogramQuantile struct {
annotations *annotations.Annotations
innerSeriesMetricNames *operators.MetricNames
innerExpressionPosition posrange.PositionRange
enableDelayedNameRemoval bool
}

func (q *histogramQuantile) LoadArguments(ctx context.Context) error {
Expand All @@ -568,13 +576,23 @@ func (q *histogramQuantile) LoadArguments(ctx context.Context) error {
return nil
}

// getMetricNameForSeries returns the metric name from innerSeriesMetricNames for the given series index.
// If enableDelayedNameRemoval is not enabled, this func will return "" to maintain compatibility with Prometheus.
func (q *histogramQuantile) getMetricNameForSeries(seriesIndex int) string {
if q.enableDelayedNameRemoval {
return q.innerSeriesMetricNames.GetMetricNameForSeries(seriesIndex)
} else {
return intentionallyEmptyMetricName
}
}

func (q *histogramQuantile) ComputeClassicHistogramResult(pointIndex int, seriesIndex int, buckets promql.Buckets) float64 {
ph := q.phValues.Samples[pointIndex].F
res, forcedMonotonicity, _ := promql.BucketQuantile(ph, buckets)

if forcedMonotonicity {
q.annotations.Add(annotations.NewHistogramQuantileForcedMonotonicityInfo(
intentionallyEmptyMetricName,
q.getMetricNameForSeries(seriesIndex),
q.innerExpressionPosition,
))
}
Expand All @@ -584,7 +602,7 @@ func (q *histogramQuantile) ComputeClassicHistogramResult(pointIndex int, series

func (q *histogramQuantile) ComputeNativeHistogramResult(pointIndex int, seriesIndex int, h *histogram.FloatHistogram) (float64, annotations.Annotations) {
ph := q.phValues.Samples[pointIndex].F
return promql.HistogramQuantile(ph, h, q.innerSeriesMetricNames.GetMetricNameForSeries(seriesIndex), q.innerExpressionPosition)
return promql.HistogramQuantile(ph, h, q.getMetricNameForSeries(seriesIndex), q.innerExpressionPosition)
}

func (q *histogramQuantile) Prepare(ctx context.Context, params *types.PrepareParams) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,14 @@ func runTestCasesWithDelayedNameRemovalDisabled(t *testing.T, globPattern string
for _, testFile := range testFiles {
t.Run(testFile, func(t *testing.T) {
if strings.Contains(testFile, "name_label_dropping") {
t.Skip("name_label_dropping tests require delayed name removal to be enabled, but optimization pass requires it to be disabled")
t.Skip("name_label_dropping tests require delayed name removal to be enabled, but this test exercises the optimization pass with delayed name removal disabled")
}
if strings.Contains(testFile, "delayed_name_removal_enabled") {
t.Skip("delayed_name_removal_enabled tests require delayed name removal to be enabled, but this test exercises the optimization pass with delayed name removal disabled")
}
// Note that we get the equivalent test coverage from ours/native_histograms_delayed_name_removal_disabled.test
if strings.Contains(testFile, "upstream/native_histograms.test") {
t.Skip("upstream/native_histograms.test tests require delayed name removal to be enabled, but this test exercises the optimization pass with delayed name removal disabled")
}

f, err := testdataFS.Open(testFile)
Expand Down
Loading
Loading