Skip to content

Commit 28262f6

Browse files
committed
fix: improve OOM controller stability and make test strict on false positives
- Add d_* PSI derivative values to the trigger expression context - Only trigger OOM action while PSI is rising - Make OOM test fail if controller kills a cgroup without stress-ng Signed-off-by: Dmitrii Sharshakov <[email protected]>
1 parent d69305a commit 28262f6

File tree

8 files changed

+87
-36
lines changed

8 files changed

+87
-36
lines changed

internal/app/machined/pkg/controllers/runtime/internal/oom/oom.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"io/fs"
1111
"os"
1212
"path/filepath"
13+
"time"
1314

1415
"github.com/google/cel-go/common/types"
1516
"go.uber.org/zap"
@@ -56,12 +57,7 @@ func (cgroup *RankedCgroup) CalculateScore(expr *cel.Expression) (float64, error
5657

5758
// EvaluateTrigger is a method obtaining data and evaluating the trigger expression.
5859
// When the result is true, designated OOM action is to be executed.
59-
func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any, cgroup string) (bool, error) {
60-
err := PopulatePsiToCtx(cgroup, evalContext)
61-
if err != nil {
62-
return false, fmt.Errorf("cannot populate PSI context: %w", err)
63-
}
64-
60+
func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any) (bool, error) {
6561
trigger, err := triggerExpr.EvalBool(celenv.OOMTrigger(), evalContext)
6662
if err != nil {
6763
return false, fmt.Errorf("cannot evaluate expression: %w", err)
@@ -71,7 +67,7 @@ func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any, cgr
7167
}
7268

7369
// PopulatePsiToCtx populates the context with PSI data from a cgroup.
74-
func PopulatePsiToCtx(cgroup string, evalContext map[string]any) error {
70+
func PopulatePsiToCtx(cgroup string, evalContext map[string]any, psi map[string]float64, sampleInterval time.Duration) error {
7571
node, err := cgroups.GetCgroupProperty(cgroup, "memory.pressure")
7672
if err != nil {
7773
return fmt.Errorf("cannot read memory pressure: %w", err)
@@ -93,7 +89,15 @@ func PopulatePsiToCtx(cgroup string, evalContext map[string]any) error {
9389
return fmt.Errorf("PSI is not defined")
9490
}
9591

92+
diff := 0.
93+
94+
if oldValue, ok := psi["memory_"+psiType+"_"+span]; ok {
95+
diff = (value.Float64() - oldValue) / sampleInterval.Seconds()
96+
}
97+
98+
evalContext["d_memory_"+psiType+"_"+span] = diff
9699
evalContext["memory_"+psiType+"_"+span] = value.Float64()
100+
psi["memory_"+psiType+"_"+span] = value.Float64()
97101
}
98102
}
99103

internal/app/machined/pkg/controllers/runtime/internal/oom/oom_test.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -125,29 +125,45 @@ func TestPopulatePsiToCtx(t *testing.T) {
125125
dir: "./testdata/trigger-false",
126126
expectErr: "",
127127
expect: map[string]any{
128-
"memory_full_avg10": 2.4,
129-
"memory_full_avg300": 1.71,
130-
"memory_full_avg60": 5.16,
131-
"memory_full_total": 1.0654831e+07,
132-
"memory_some_avg10": 2.82,
133-
"memory_some_avg300": 1.97,
134-
"memory_some_avg60": 5.95,
135-
"memory_some_total": 1.217234e+07,
128+
"memory_full_avg10": 2.4,
129+
"memory_full_avg300": 1.71,
130+
"memory_full_avg60": 5.16,
131+
"memory_full_total": 1.0654831e+07,
132+
"memory_some_avg10": 2.82,
133+
"memory_some_avg300": 1.97,
134+
"memory_some_avg60": 5.95,
135+
"memory_some_total": 1.217234e+07,
136+
"d_memory_full_avg10": 0.0,
137+
"d_memory_full_avg300": 0.0,
138+
"d_memory_full_avg60": 0.0,
139+
"d_memory_full_total": 0.0,
140+
"d_memory_some_avg10": 0.0,
141+
"d_memory_some_avg300": 0.0,
142+
"d_memory_some_avg60": 0.0,
143+
"d_memory_some_total": 0.0,
136144
},
137145
},
138146
{
139147
name: "true",
140148
dir: "./testdata/trigger-true",
141149
expectErr: "",
142150
expect: map[string]any{
143-
"memory_full_avg10": 14.54,
144-
"memory_full_avg60": 6.97,
145-
"memory_full_avg300": 1.82,
146-
"memory_full_total": 1.0654831e+07,
147-
"memory_some_avg10": 17.06,
148-
"memory_some_avg60": 8.04,
149-
"memory_some_avg300": 2.1,
150-
"memory_some_total": 1.217234e+07,
151+
"memory_full_avg10": 14.54,
152+
"memory_full_avg60": 6.97,
153+
"memory_full_avg300": 1.82,
154+
"memory_full_total": 1.0654831e+07,
155+
"memory_some_avg10": 17.06,
156+
"memory_some_avg60": 8.04,
157+
"memory_some_avg300": 2.1,
158+
"memory_some_total": 1.217234e+07,
159+
"d_memory_full_avg10": 0.0,
160+
"d_memory_full_avg300": 0.0,
161+
"d_memory_full_avg60": 0.0,
162+
"d_memory_full_total": 0.0,
163+
"d_memory_some_avg10": 0.0,
164+
"d_memory_some_avg300": 0.0,
165+
"d_memory_some_avg60": 0.0,
166+
"d_memory_some_total": 0.0,
151167
},
152168
},
153169
} {
@@ -156,7 +172,7 @@ func TestPopulatePsiToCtx(t *testing.T) {
156172

157173
ctx := map[string]any{}
158174

159-
err := oom.PopulatePsiToCtx(test.dir, ctx)
175+
err := oom.PopulatePsiToCtx(test.dir, ctx, make(map[string]float64), 0)
160176

161177
if test.expectErr == "" {
162178
require.NoError(t, err)
@@ -241,7 +257,10 @@ func TestEvaluateTrigger(t *testing.T) {
241257
t.Run(test.name, func(t *testing.T) {
242258
t.Parallel()
243259

244-
trigger, err := oom.EvaluateTrigger(test.triggerExpr, test.ctx, test.dir)
260+
err := oom.PopulatePsiToCtx(test.dir, test.ctx, make(map[string]float64), 0)
261+
require.NoError(t, err)
262+
263+
trigger, err := oom.EvaluateTrigger(test.triggerExpr, test.ctx)
245264

246265
assert.Equal(t, test.expect, trigger)
247266

internal/app/machined/pkg/controllers/runtime/oom.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type OOMController struct {
4646
V1Alpha1Mode runtime.Mode
4747
actionLog []actionLogItem
4848
idSeq int
49+
psi map[string]float64
4950
}
5051

5152
// Name implements controller.Controller interface.
@@ -117,6 +118,7 @@ func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger
117118
triggerExpr := defaultTriggerExpr()
118119
scoringExpr := defaultScoringExpr()
119120
sampleInterval := defaultSampleInterval
121+
ctrl.psi = make(map[string]float64)
120122

121123
ticker := time.NewTicker(sampleInterval)
122124
tickerC := ticker.C
@@ -150,7 +152,14 @@ func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger
150152
"time_since_trigger": time.Since(ctrl.ActionTriggered),
151153
}
152154

153-
trigger, err := oom.EvaluateTrigger(triggerExpr, evalContext, ctrl.CgroupRoot)
155+
err := oom.PopulatePsiToCtx(ctrl.CgroupRoot, evalContext, ctrl.psi, sampleInterval)
156+
if err != nil {
157+
logger.Error("cannot populate PSI context", zap.Error(err))
158+
159+
continue
160+
}
161+
162+
trigger, err := oom.EvaluateTrigger(triggerExpr, evalContext)
154163
if err != nil {
155164
logger.Error("cannot evaluate OOM trigger expression", zap.Error(err))
156165

internal/integration/k8s/oom.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ func (suite *OomSuite) SuiteName() string {
3939

4040
// TestOom verifies that system remains stable after handling an OOM event.
4141
func (suite *OomSuite) TestOom() {
42-
suite.T().Skip("skip the test until https://github.com/siderolabs/talos/issues/12077 is resolved")
43-
4442
if suite.Cluster == nil {
4543
suite.T().Skip("without full cluster state reaching out to the node IP is not reliable")
4644
}
@@ -77,7 +75,7 @@ func (suite *OomSuite) TestOom() {
7775
suite.Require().NoError(err)
7876

7977
memoryBytes := memInfo.GetMessages()[0].GetMeminfo().GetMemtotal() * 1024
80-
numReplicas := int((memoryBytes/1024/1024+2048-1)/2048) * numWorkers * 15
78+
numReplicas := int((memoryBytes/1024/1024+2048-1)/2048) * numWorkers * 25
8179

8280
suite.T().Logf("detected memory: %s, workers %d => scaling to %d replicas",
8381
humanize.IBytes(memoryBytes), numWorkers, numReplicas)
@@ -86,12 +84,16 @@ func (suite *OomSuite) TestOom() {
8684
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", patchToReplicas(suite.T(), numReplicas))
8785

8886
// Expect at least one OOM kill of stress-ng within 15 seconds
89-
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng"))
87+
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng", 1))
9088

9189
// Scale to 1, wait for deployment to scale down, proving system is operational
9290
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", patchToReplicas(suite.T(), 1))
9391
suite.Require().NoError(suite.WaitForDeploymentAvailable(ctx, time.Minute, "default", "stress-mem", 1))
9492

93+
// Monitor OOM kills for 15 seconds and make sure no kills other than stress-ng happen
94+
// Allow 0 as well: ideally that'd be the case, but fail on anything not containing stress-ng
95+
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng", 0))
96+
9597
suite.APISuite.AssertClusterHealthy(ctx)
9698
}
9799

@@ -111,7 +113,7 @@ func patchToReplicas(t *testing.T, replicas int) []byte {
111113
// Waits for a period of time and return returns whether or not OOM events containing a specified process have been observed.
112114
//
113115
//nolint:gocyclo
114-
func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, timeout time.Duration, substr string) bool {
116+
func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, timeout time.Duration, substr string, n int) bool {
115117
startTime := time.Now()
116118

117119
watchCh := make(chan state.Event)
@@ -135,9 +137,9 @@ func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, time
135137
case <-timeoutCh:
136138
suite.T().Logf("observed %d OOM events containing process substring %q", numOOMObserved, substr)
137139

138-
return numOOMObserved > 0
140+
return numOOMObserved >= n
139141
case <-timeToObserveCh:
140-
if numOOMObserved > 0 {
142+
if numOOMObserved >= n {
141143
// if we already observed some OOM events, consider it a success
142144
suite.T().Logf("observed %d OOM events containing process substring %q", numOOMObserved, substr)
143145

@@ -150,11 +152,20 @@ func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, time
150152

151153
res := ev.Resource.(*runtime.OOMAction).TypedSpec()
152154

155+
found := false
156+
153157
for _, proc := range res.Processes {
154158
if strings.Contains(proc, substr) {
159+
found = true
155160
numOOMObserved++
156161
}
157162
}
163+
164+
if !found {
165+
suite.T().Logf("observed an OOM event not containing process substring %q: %q", substr, res.Processes)
166+
167+
return false
168+
}
158169
}
159170
}
160171
}

pkg/machinery/cel/celenv/celenv.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,14 @@ var OOMTrigger = sync.OnceValue(func() *cel.Env {
9898
cel.Variable("memory_full_avg60", types.DoubleType),
9999
cel.Variable("memory_full_avg300", types.DoubleType),
100100
cel.Variable("memory_full_total", types.DoubleType),
101+
cel.Variable("d_memory_some_avg10", types.DoubleType),
102+
cel.Variable("d_memory_some_avg60", types.DoubleType),
103+
cel.Variable("d_memory_some_avg300", types.DoubleType),
104+
cel.Variable("d_memory_some_total", types.DoubleType),
105+
cel.Variable("d_memory_full_avg10", types.DoubleType),
106+
cel.Variable("d_memory_full_avg60", types.DoubleType),
107+
cel.Variable("d_memory_full_avg300", types.DoubleType),
108+
cel.Variable("d_memory_full_total", types.DoubleType),
101109
cel.Variable("time_since_trigger", types.DurationType),
102110
cel.OptionalTypes(),
103111
},
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
apiVersion: v1alpha1
22
kind: OOMConfig
3-
triggerExpression: memory_full_avg10 > 12.0 && time_since_trigger > duration("500ms")
3+
triggerExpression: memory_full_avg10 > 12.0 && d_memory_full_avg10 > 0.0 && time_since_trigger > duration("500ms")
44
cgroupRankingExpression: 'memory_max.hasValue() ? 0.0 : ({Besteffort: 1.0, Burstable: 0.5, Guaranteed: 0.0, Podruntime: 0.0, System: 0.0}[class] * double(memory_current.orValue(0u)) / double(memory_peak.orValue(0u) - memory_current.orValue(0u)))'
55
sampleInterval: 100ms

pkg/machinery/constants/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1314,7 +1314,7 @@ const (
13141314
ContainerMarkerFilePath = "/usr/etc/in-container"
13151315

13161316
// DefaultOOMTriggerExpression is the default CEL expression used to determine whether to trigger OOM.
1317-
DefaultOOMTriggerExpression = `memory_full_avg10 > 12.0 && time_since_trigger > duration("500ms")`
1317+
DefaultOOMTriggerExpression = `memory_full_avg10 > 12.0 && d_memory_full_avg10 > 0.0 && time_since_trigger > duration("500ms")`
13181318

13191319
// DefaultOOMCgroupRankingExpression is the default CEL expression used to rank cgroups for OOM killer.
13201320
DefaultOOMCgroupRankingExpression = `memory_max.hasValue() ? 0.0 :

website/content/v1.12/reference/configuration/runtime/oomconfig.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ title: OOMConfig
1616
{{< highlight yaml >}}
1717
apiVersion: v1alpha1
1818
kind: OOMConfig
19-
triggerExpression: memory_full_avg10 > 12.0 && time_since_trigger > duration("500ms") # This expression defines when to trigger OOM action.
19+
triggerExpression: memory_full_avg10 > 12.0 && d_memory_full_avg10 > 0.0 && time_since_trigger > duration("500ms") # This expression defines when to trigger OOM action.
2020
cgroupRankingExpression: 'memory_max.hasValue() ? 0.0 : ({Besteffort: 1.0, Burstable: 0.5, Guaranteed: 0.0, Podruntime: 0.0, System: 0.0}[class] * double(memory_current.orValue(0u)) / double(memory_peak.orValue(0u) - memory_current.orValue(0u)))' # This expression defines how to rank cgroups for OOM handler.
2121
sampleInterval: 100ms # How often should the trigger expression be evaluated.
2222
{{< /highlight >}}

0 commit comments

Comments
 (0)