Skip to content

Commit 79bffbc

Browse files
committed
fix: improve OOM controller stability and make test strict on false positives
- Add d_* PSI derivative values to the trigger expression context - Only trigger OOM action while PSI is rising - Make OOM test fail if controller kills a cgroup without stress-ng Signed-off-by: Dmitrii Sharshakov <[email protected]>
1 parent d69305a commit 79bffbc

File tree

6 files changed

+52
-17
lines changed

6 files changed

+52
-17
lines changed

internal/app/machined/pkg/controllers/runtime/internal/oom/oom.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"io/fs"
1111
"os"
1212
"path/filepath"
13+
"time"
1314

1415
"github.com/google/cel-go/common/types"
1516
"go.uber.org/zap"
@@ -56,12 +57,7 @@ func (cgroup *RankedCgroup) CalculateScore(expr *cel.Expression) (float64, error
5657

5758
// EvaluateTrigger is a method obtaining data and evaluating the trigger expression.
5859
// When the result is true, designated OOM action is to be executed.
59-
func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any, cgroup string) (bool, error) {
60-
err := PopulatePsiToCtx(cgroup, evalContext)
61-
if err != nil {
62-
return false, fmt.Errorf("cannot populate PSI context: %w", err)
63-
}
64-
60+
func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any) (bool, error) {
6561
trigger, err := triggerExpr.EvalBool(celenv.OOMTrigger(), evalContext)
6662
if err != nil {
6763
return false, fmt.Errorf("cannot evaluate expression: %w", err)
@@ -71,7 +67,7 @@ func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any, cgr
7167
}
7268

7369
// PopulatePsiToCtx populates the context with PSI data from a cgroup.
74-
func PopulatePsiToCtx(cgroup string, evalContext map[string]any) error {
70+
func PopulatePsiToCtx(cgroup string, evalContext map[string]any, psi map[string]float64, sampleInterval time.Duration) error {
7571
node, err := cgroups.GetCgroupProperty(cgroup, "memory.pressure")
7672
if err != nil {
7773
return fmt.Errorf("cannot read memory pressure: %w", err)
@@ -93,7 +89,15 @@ func PopulatePsiToCtx(cgroup string, evalContext map[string]any) error {
9389
return fmt.Errorf("PSI is not defined")
9490
}
9591

92+
diff := 0.
93+
94+
if oldValue, ok := psi["memory_"+psiType+"_"+span]; ok {
95+
diff = (value.Float64() - oldValue) / sampleInterval.Seconds()
96+
}
97+
98+
evalContext["d_memory_"+psiType+"_"+span] = diff
9699
evalContext["memory_"+psiType+"_"+span] = value.Float64()
100+
psi["memory_"+psiType+"_"+span] = value.Float64()
97101
}
98102
}
99103

internal/app/machined/pkg/controllers/runtime/internal/oom/oom_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestPopulatePsiToCtx(t *testing.T) {
156156

157157
ctx := map[string]any{}
158158

159-
err := oom.PopulatePsiToCtx(test.dir, ctx)
159+
err := oom.PopulatePsiToCtx(test.dir, ctx, make(map[string]float64), 0)
160160

161161
if test.expectErr == "" {
162162
require.NoError(t, err)
@@ -241,7 +241,10 @@ func TestEvaluateTrigger(t *testing.T) {
241241
t.Run(test.name, func(t *testing.T) {
242242
t.Parallel()
243243

244-
trigger, err := oom.EvaluateTrigger(test.triggerExpr, test.ctx, test.dir)
244+
err := oom.PopulatePsiToCtx(test.dir, test.ctx, make(map[string]float64), 0)
245+
require.NoError(t, err)
246+
247+
trigger, err := oom.EvaluateTrigger(test.triggerExpr, test.ctx)
245248

246249
assert.Equal(t, test.expect, trigger)
247250

internal/app/machined/pkg/controllers/runtime/oom.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type OOMController struct {
4646
V1Alpha1Mode runtime.Mode
4747
actionLog []actionLogItem
4848
idSeq int
49+
psi map[string]float64
4950
}
5051

5152
// Name implements controller.Controller interface.
@@ -117,6 +118,7 @@ func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger
117118
triggerExpr := defaultTriggerExpr()
118119
scoringExpr := defaultScoringExpr()
119120
sampleInterval := defaultSampleInterval
121+
ctrl.psi = make(map[string]float64)
120122

121123
ticker := time.NewTicker(sampleInterval)
122124
tickerC := ticker.C
@@ -150,7 +152,14 @@ func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger
150152
"time_since_trigger": time.Since(ctrl.ActionTriggered),
151153
}
152154

153-
trigger, err := oom.EvaluateTrigger(triggerExpr, evalContext, ctrl.CgroupRoot)
155+
err := oom.PopulatePsiToCtx(ctrl.CgroupRoot, evalContext, ctrl.psi, sampleInterval)
156+
if err != nil {
157+
logger.Error("cannot populate PSI context", zap.Error(err))
158+
159+
continue
160+
}
161+
162+
trigger, err := oom.EvaluateTrigger(triggerExpr, evalContext)
154163
if err != nil {
155164
logger.Error("cannot evaluate OOM trigger expression", zap.Error(err))
156165

internal/integration/k8s/oom.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ func (suite *OomSuite) SuiteName() string {
3939

4040
// TestOom verifies that system remains stable after handling an OOM event.
4141
func (suite *OomSuite) TestOom() {
42-
suite.T().Skip("skip the test until https://github.com/siderolabs/talos/issues/12077 is resolved")
43-
4442
if suite.Cluster == nil {
4543
suite.T().Skip("without full cluster state reaching out to the node IP is not reliable")
4644
}
@@ -86,12 +84,16 @@ func (suite *OomSuite) TestOom() {
8684
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", patchToReplicas(suite.T(), numReplicas))
8785

8886
// Expect at least one OOM kill of stress-ng within 15 seconds
89-
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng"))
87+
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng", 1))
9088

9189
// Scale to 1, wait for deployment to scale down, proving system is operational
9290
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", patchToReplicas(suite.T(), 1))
9391
suite.Require().NoError(suite.WaitForDeploymentAvailable(ctx, time.Minute, "default", "stress-mem", 1))
9492

93+
// Monitor OOM kills for 15 seconds and make sure no kills other than stress-ng happen
94+
// Allow 0 as well: ideally that'd be the case, but fail on anything not containing stress-ng
95+
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng", 0))
96+
9597
suite.APISuite.AssertClusterHealthy(ctx)
9698
}
9799

@@ -111,7 +113,7 @@ func patchToReplicas(t *testing.T, replicas int) []byte {
111113
// Waits for a period of time and return returns whether or not OOM events containing a specified process have been observed.
112114
//
113115
//nolint:gocyclo
114-
func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, timeout time.Duration, substr string) bool {
116+
func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, timeout time.Duration, substr string, n int) bool {
115117
startTime := time.Now()
116118

117119
watchCh := make(chan state.Event)
@@ -135,9 +137,9 @@ func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, time
135137
case <-timeoutCh:
136138
suite.T().Logf("observed %d OOM events containing process substring %q", numOOMObserved, substr)
137139

138-
return numOOMObserved > 0
140+
return numOOMObserved >= n
139141
case <-timeToObserveCh:
140-
if numOOMObserved > 0 {
142+
if numOOMObserved >= n {
141143
// if we already observed some OOM events, consider it a success
142144
suite.T().Logf("observed %d OOM events containing process substring %q", numOOMObserved, substr)
143145

@@ -150,11 +152,20 @@ func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, time
150152

151153
res := ev.Resource.(*runtime.OOMAction).TypedSpec()
152154

155+
found := false
156+
153157
for _, proc := range res.Processes {
154158
if strings.Contains(proc, substr) {
159+
found = true
155160
numOOMObserved++
156161
}
157162
}
163+
164+
if !found {
165+
suite.T().Logf("observed an OOM event not containing process substring %q: %q", substr, res.Processes)
166+
167+
return false
168+
}
158169
}
159170
}
160171
}

pkg/machinery/cel/celenv/celenv.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,14 @@ var OOMTrigger = sync.OnceValue(func() *cel.Env {
9898
cel.Variable("memory_full_avg60", types.DoubleType),
9999
cel.Variable("memory_full_avg300", types.DoubleType),
100100
cel.Variable("memory_full_total", types.DoubleType),
101+
cel.Variable("d_memory_some_avg10", types.DoubleType),
102+
cel.Variable("d_memory_some_avg60", types.DoubleType),
103+
cel.Variable("d_memory_some_avg300", types.DoubleType),
104+
cel.Variable("d_memory_some_total", types.DoubleType),
105+
cel.Variable("d_memory_full_avg10", types.DoubleType),
106+
cel.Variable("d_memory_full_avg60", types.DoubleType),
107+
cel.Variable("d_memory_full_avg300", types.DoubleType),
108+
cel.Variable("d_memory_full_total", types.DoubleType),
101109
cel.Variable("time_since_trigger", types.DurationType),
102110
cel.OptionalTypes(),
103111
},

pkg/machinery/constants/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1314,7 +1314,7 @@ const (
13141314
ContainerMarkerFilePath = "/usr/etc/in-container"
13151315

13161316
// DefaultOOMTriggerExpression is the default CEL expression used to determine whether to trigger OOM.
1317-
DefaultOOMTriggerExpression = `memory_full_avg10 > 12.0 && time_since_trigger > duration("500ms")`
1317+
DefaultOOMTriggerExpression = `memory_full_avg10 > 12.0 && d_memory_full_avg10 > 0.0 && time_since_trigger > duration("500ms")`
13181318

13191319
// DefaultOOMCgroupRankingExpression is the default CEL expression used to rank cgroups for OOM killer.
13201320
DefaultOOMCgroupRankingExpression = `memory_max.hasValue() ? 0.0 :

0 commit comments

Comments
 (0)