Skip to content

Commit ddfb6d8

Browse files
committed
Create intervals when etcd disk metrics are over upstream recommended thresholds
1 parent 4faf768 commit ddfb6d8

File tree

3 files changed

+235
-2
lines changed

3 files changed

+235
-2
lines changed

pkg/defaultmonitortests/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/openshift/origin/pkg/monitortests/testframework/disruptionexternalservicemonitoring"
5050
"github.com/openshift/origin/pkg/monitortests/testframework/disruptionserializer"
5151
"github.com/openshift/origin/pkg/monitortests/testframework/e2etestanalyzer"
52+
"github.com/openshift/origin/pkg/monitortests/testframework/etcddiskmetricsintervals"
5253
"github.com/openshift/origin/pkg/monitortests/testframework/highcpumetriccollector"
5354
"github.com/openshift/origin/pkg/monitortests/testframework/highcputestanalyzer"
5455

@@ -176,6 +177,7 @@ func newUniversalMonitorTests(info monitortestframework.MonitorTestInitializatio
176177

177178
monitorTestRegistry.AddMonitorTestOrDie("etcd-log-analyzer", "etcd", etcdloganalyzer.NewEtcdLogAnalyzer())
178179
monitorTestRegistry.AddMonitorTestOrDie("legacy-etcd-invariants", "etcd", legacyetcdmonitortests.NewLegacyTests())
180+
monitorTestRegistry.AddMonitorTestOrDie("etcd-disk-metrics-intervals", "etcd", etcddiskmetricsintervals.NewEtcdDiskMetricsCollector())
179181

180182
monitorTestRegistry.AddMonitorTestOrDie("audit-log-analyzer", "kube-apiserver", auditloganalyzer.NewAuditLogAnalyzer(info))
181183
monitorTestRegistry.AddMonitorTestOrDie("legacy-kube-apiserver-invariants", "kube-apiserver", legacykubeapiservermonitortests.NewLegacyTests())

pkg/monitor/monitorapi/types.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,8 +383,10 @@ const (
383383

384384
SourceGenerationMonitor IntervalSource = "GenerationMonitor"
385385

386-
SourceStaticPodInstallMonitor IntervalSource = "StaticPodInstallMonitor"
387-
SourceCPUMonitor IntervalSource = "CPUMonitor"
386+
SourceStaticPodInstallMonitor IntervalSource = "StaticPodInstallMonitor"
387+
SourceCPUMonitor IntervalSource = "CPUMonitor"
388+
SourceEtcdDiskCommitDuration IntervalSource = "EtcdDiskCommitDuration"
389+
SourceEtcdDiskWalFsyncDuration IntervalSource = "EtcdDiskWalFsyncDuration"
388390
)
389391

390392
type Interval struct {
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
package etcddiskmetricsintervals
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
routeclient "github.com/openshift/client-go/route/clientset/versioned"
9+
"github.com/openshift/library-go/test/library/metrics"
10+
"github.com/openshift/origin/pkg/monitor/monitorapi"
11+
"github.com/openshift/origin/pkg/monitortestframework"
12+
"github.com/openshift/origin/pkg/monitortestlibrary/prometheus"
13+
"github.com/openshift/origin/pkg/test/ginkgo/junitapi"
14+
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
15+
prometheustypes "github.com/prometheus/common/model"
16+
"github.com/sirupsen/logrus"
17+
apierrors "k8s.io/apimachinery/pkg/api/errors"
18+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
"k8s.io/client-go/kubernetes"
20+
"k8s.io/client-go/rest"
21+
)
22+
23+
type etcdDiskMetricsCollector struct {
24+
adminRESTConfig *rest.Config
25+
commitDurationThreshold float64
26+
walFsyncThreshold float64
27+
}
28+
29+
func NewEtcdDiskMetricsCollector() monitortestframework.MonitorTest {
30+
return &etcdDiskMetricsCollector{
31+
commitDurationThreshold: 0.025, // 25ms threshold, defined upstream
32+
walFsyncThreshold: 0.01, // 10ms threshold, defined upstream
33+
}
34+
}
35+
36+
func (w *etcdDiskMetricsCollector) PrepareCollection(ctx context.Context, adminRESTConfig *rest.Config, recorder monitorapi.RecorderWriter) error {
37+
return nil
38+
}
39+
40+
func (w *etcdDiskMetricsCollector) StartCollection(ctx context.Context, adminRESTConfig *rest.Config, recorder monitorapi.RecorderWriter) error {
41+
w.adminRESTConfig = adminRESTConfig
42+
return nil
43+
}
44+
45+
func (w *etcdDiskMetricsCollector) CollectData(ctx context.Context, storageDir string, beginning, end time.Time) (monitorapi.Intervals, []*junitapi.JUnitTestCase, error) {
46+
logger := logrus.WithField("MonitorTest", "EtcdDiskMetricsCollector")
47+
48+
intervals, err := w.buildIntervalsForEtcdDiskMetrics(ctx, w.adminRESTConfig, beginning)
49+
if err != nil {
50+
return nil, nil, err
51+
}
52+
53+
logger.Infof("collected %d etcd disk metrics intervals", len(intervals))
54+
return intervals, nil, nil
55+
}
56+
57+
func (w *etcdDiskMetricsCollector) buildIntervalsForEtcdDiskMetrics(ctx context.Context, restConfig *rest.Config, startTime time.Time) ([]monitorapi.Interval, error) {
58+
logger := logrus.WithField("func", "buildIntervalsForEtcdDiskMetrics")
59+
kubeClient, err := kubernetes.NewForConfig(restConfig)
60+
if err != nil {
61+
return nil, err
62+
}
63+
routeClient, err := routeclient.NewForConfig(restConfig)
64+
if err != nil {
65+
return nil, err
66+
}
67+
68+
_, err = kubeClient.CoreV1().Namespaces().Get(ctx, "openshift-monitoring", metav1.GetOptions{})
69+
if apierrors.IsNotFound(err) {
70+
return []monitorapi.Interval{}, nil
71+
} else if err != nil {
72+
return nil, err
73+
}
74+
75+
prometheusClient, err := metrics.NewPrometheusClient(ctx, kubeClient, routeClient)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
if _, err := prometheus.EnsureThanosQueriersConnectedToPromSidecars(ctx, prometheusClient); err != nil {
81+
return nil, err
82+
}
83+
84+
timeRange := prometheusv1.Range{
85+
Start: startTime,
86+
End: time.Now(),
87+
Step: 30 * time.Second, // Sample every 30 seconds for better granularity
88+
}
89+
90+
var allIntervals []monitorapi.Interval
91+
92+
// Query for etcd disk backend commit duration over upstream guidance
93+
commitDurationQuery := `histogram_quantile(0.99, rate(etcd_disk_backend_commit_duration_seconds_bucket{job=~".*etcd.*"}[5m]))`
94+
commitMetrics, warningsForCommit, err := prometheusClient.QueryRange(ctx, commitDurationQuery, timeRange)
95+
if err != nil {
96+
return nil, err
97+
}
98+
if len(warningsForCommit) > 0 {
99+
for _, w := range warningsForCommit {
100+
logger.Warnf("Commit duration metric query warning: %s", w)
101+
}
102+
}
103+
104+
commitIntervals, err := w.createIntervalsFromMetrics(logger, commitMetrics, monitorapi.SourceEtcdDiskCommitDuration, w.commitDurationThreshold, "disk backend commit duration")
105+
if err != nil {
106+
return nil, err
107+
}
108+
allIntervals = append(allIntervals, commitIntervals...)
109+
110+
// Query for etcd disk WAL fsync duration over upstream guidance
111+
walFsyncQuery := `histogram_quantile(0.99, rate(etcd_disk_wal_fsync_duration_seconds_bucket{job=~".*etcd.*"}[5m]))`
112+
walFsyncMetrics, warningsForWal, err := prometheusClient.QueryRange(ctx, walFsyncQuery, timeRange)
113+
if err != nil {
114+
return nil, err
115+
}
116+
if len(warningsForWal) > 0 {
117+
for _, w := range warningsForWal {
118+
logger.Warnf("WAL fsync metric query warning: %s", w)
119+
}
120+
}
121+
122+
walFsyncIntervals, err := w.createIntervalsFromMetrics(logger, walFsyncMetrics, monitorapi.SourceEtcdDiskWalFsyncDuration, w.walFsyncThreshold, "disk WAL fsync duration")
123+
if err != nil {
124+
return nil, err
125+
}
126+
allIntervals = append(allIntervals, walFsyncIntervals...)
127+
128+
return allIntervals, nil
129+
}
130+
131+
func (w *etcdDiskMetricsCollector) createIntervalsFromMetrics(logger logrus.FieldLogger, promVal prometheustypes.Value, source monitorapi.IntervalSource, threshold float64, metricType string) ([]monitorapi.Interval, error) {
132+
ret := []monitorapi.Interval{}
133+
134+
switch {
135+
case promVal.Type() == prometheustypes.ValMatrix:
136+
promMatrix := promVal.(prometheustypes.Matrix)
137+
for _, promSampleStream := range promMatrix {
138+
pod := string(promSampleStream.Metric["pod"])
139+
ns := string(promSampleStream.Metric["namespace"])
140+
141+
// Create locator for the pod - etcd pods are typically in openshift-etcd namespace
142+
// but we'll use empty namespace and uid as we don't have them from the metrics
143+
locator := monitorapi.NewLocator().PodFromNames(ns, pod, "")
144+
145+
// Track consecutive high duration periods
146+
var highDurationStart *time.Time
147+
var highDurationEnd *time.Time
148+
var peakDuration float64
149+
150+
for _, currValue := range promSampleStream.Values {
151+
currTime := currValue.Timestamp.Time()
152+
duration := float64(currValue.Value)
153+
154+
// Check if duration exceeds threshold
155+
if duration > threshold {
156+
// If not currently in a high duration period, start a new one
157+
if highDurationStart == nil {
158+
highDurationStart = &currTime
159+
peakDuration = duration
160+
} else {
161+
// Continue the current high duration period, track peak duration
162+
if duration > peakDuration {
163+
peakDuration = duration
164+
}
165+
}
166+
// Always update the end time to current time for continuous high duration
167+
highDurationEnd = &currTime
168+
} else {
169+
// Duration dropped below threshold
170+
if highDurationStart != nil && highDurationEnd != nil {
171+
// Create interval for the high duration period that just ended
172+
ret = append(ret, w.createDiskMetricInterval(locator, pod, *highDurationStart, *highDurationEnd, peakDuration, source, threshold, metricType))
173+
// Reset tracking variables
174+
highDurationStart = nil
175+
highDurationEnd = nil
176+
peakDuration = 0
177+
}
178+
}
179+
}
180+
181+
// Handle case where high duration period extends to the end of the monitoring window
182+
if highDurationStart != nil && highDurationEnd != nil {
183+
ret = append(ret, w.createDiskMetricInterval(locator, pod, *highDurationStart, *highDurationEnd, peakDuration, source, threshold, metricType))
184+
}
185+
}
186+
187+
default:
188+
logger.WithField("type", promVal.Type()).Warning("unhandled prometheus value type received")
189+
}
190+
191+
return ret, nil
192+
}
193+
194+
func (w *etcdDiskMetricsCollector) createDiskMetricInterval(locator monitorapi.Locator, pod string, start, end time.Time, peakDuration float64, source monitorapi.IntervalSource, threshold float64, metricType string) monitorapi.Interval {
195+
// Create message with all necessary information
196+
msgBuilder := monitorapi.NewMessage().
197+
Reason(monitorapi.IntervalReason("HighEtcdDiskDuration")).
198+
HumanMessage(fmt.Sprintf("Etcd %s above upstream recommended %.3fs threshold on pod %s", metricType, threshold, pod)).
199+
WithAnnotation("duration_threshold", fmt.Sprintf("%.3f", threshold))
200+
201+
if peakDuration > 0 {
202+
msgBuilder = msgBuilder.WithAnnotation("peak_duration", fmt.Sprintf("%.6f", peakDuration))
203+
}
204+
205+
// Create and build the interval directly with the appropriate source
206+
interval := monitorapi.NewInterval(source, monitorapi.Warning).
207+
Locator(locator).
208+
Message(msgBuilder).
209+
Display()
210+
211+
return interval.Build(start, end)
212+
}
213+
214+
func (*etcdDiskMetricsCollector) ConstructComputedIntervals(ctx context.Context, startingIntervals monitorapi.Intervals, recordedResources monitorapi.ResourcesMap, beginning, end time.Time) (monitorapi.Intervals, error) {
215+
return nil, nil
216+
}
217+
218+
func (w *etcdDiskMetricsCollector) EvaluateTestsFromConstructedIntervals(ctx context.Context, finalIntervals monitorapi.Intervals) ([]*junitapi.JUnitTestCase, error) {
219+
// This monitor test is purely for data collection, not for generating test cases
220+
return nil, nil
221+
}
222+
223+
func (*etcdDiskMetricsCollector) WriteContentToStorage(ctx context.Context, storageDir, timeSuffix string, finalIntervals monitorapi.Intervals, finalResourceState monitorapi.ResourcesMap) error {
224+
return nil
225+
}
226+
227+
func (*etcdDiskMetricsCollector) Cleanup(ctx context.Context) error {
228+
return nil
229+
}

0 commit comments

Comments
 (0)