Skip to content

Commit c93a9c6

Browse files
committed
fix: improve OOM controller stability and make test strict on false positives
- Add d_* PSI derivative values to the trigger expression context - Only trigger OOM action while PSI is rising - Make OOM test fail if controller kills a cgroup without stress-ng - Wait for stress-mem to terminate before proceeding with the next tests - Skip OOM test when running with race detector Signed-off-by: Dmitrii Sharshakov <[email protected]>
1 parent 021bbfe commit c93a9c6

File tree

13 files changed

+147
-42
lines changed

13 files changed

+147
-42
lines changed

.github/workflows/ci.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
22
#
3-
# Generated on 2025-10-23T15:10:44Z by kres 46e133d.
3+
# Generated on 2025-10-31T17:08:03Z by kres cd5a938.
44

55
concurrency:
66
group: ${{ github.head_ref || github.run_id }}
@@ -4670,6 +4670,7 @@ jobs:
46704670
make initramfs installer-base imager installer
46714671
- name: e2e-qemu-race
46724672
env:
4673+
EXTRA_TEST_ARGS: -talos.race
46734674
GITHUB_STEP_NAME: ${{ github.job}}-e2e-qemu-race
46744675
IMAGE_REGISTRY: registry.dev.siderolabs.io
46754676
QEMU_EXTRA_DISKS: "3"

.github/workflows/integration-qemu-race-cron.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
22
#
3-
# Generated on 2025-09-19T11:03:20Z by kres 065ec4c.
3+
# Generated on 2025-10-31T17:08:03Z by kres cd5a938.
44

55
concurrency:
66
group: ${{ github.head_ref || github.run_id }}
@@ -88,6 +88,7 @@ jobs:
8888
make initramfs installer-base imager installer
8989
- name: e2e-qemu-race
9090
env:
91+
EXTRA_TEST_ARGS: -talos.race
9192
GITHUB_STEP_NAME: ${{ github.job}}-e2e-qemu-race
9293
IMAGE_REGISTRY: registry.dev.siderolabs.io
9394
QEMU_EXTRA_DISKS: "3"

.kres.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2059,6 +2059,7 @@ spec:
20592059
WITH_CONFIG_PATCH_WORKER: "@hack/test/patches/ephemeral-nvme.yaml:@hack/test/patches/dm-raid-module.yaml"
20602060
QEMU_MEMORY_CONTROLPLANES: 4096 # race-enabled Talos consumes lots of RAM
20612061
QEMU_MEMORY_WORKERS: 4096
2062+
EXTRA_TEST_ARGS: "-talos.race"
20622063
TAG_SUFFIX: -race
20632064
IMAGE_REGISTRY: registry.dev.siderolabs.io
20642065
- name: save-talos-logs

internal/app/machined/pkg/controllers/runtime/internal/oom/oom.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"io/fs"
1111
"os"
1212
"path/filepath"
13+
"time"
1314

1415
"github.com/google/cel-go/common/types"
1516
"go.uber.org/zap"
@@ -56,12 +57,7 @@ func (cgroup *RankedCgroup) CalculateScore(expr *cel.Expression) (float64, error
5657

5758
// EvaluateTrigger is a method obtaining data and evaluating the trigger expression.
5859
// When the result is true, designated OOM action is to be executed.
59-
func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any, cgroup string) (bool, error) {
60-
err := PopulatePsiToCtx(cgroup, evalContext)
61-
if err != nil {
62-
return false, fmt.Errorf("cannot populate PSI context: %w", err)
63-
}
64-
60+
func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any) (bool, error) {
6561
trigger, err := triggerExpr.EvalBool(celenv.OOMTrigger(), evalContext)
6662
if err != nil {
6763
return false, fmt.Errorf("cannot evaluate expression: %w", err)
@@ -71,7 +67,7 @@ func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any, cgr
7167
}
7268

7369
// PopulatePsiToCtx populates the context with PSI data from a cgroup.
74-
func PopulatePsiToCtx(cgroup string, evalContext map[string]any) error {
70+
func PopulatePsiToCtx(cgroup string, evalContext map[string]any, psi map[string]float64, sampleInterval time.Duration) error {
7571
node, err := cgroups.GetCgroupProperty(cgroup, "memory.pressure")
7672
if err != nil {
7773
return fmt.Errorf("cannot read memory pressure: %w", err)
@@ -93,7 +89,15 @@ func PopulatePsiToCtx(cgroup string, evalContext map[string]any) error {
9389
return fmt.Errorf("PSI is not defined")
9490
}
9591

92+
diff := 0.
93+
94+
if oldValue, ok := psi["memory_"+psiType+"_"+span]; ok {
95+
diff = (value.Float64() - oldValue) / sampleInterval.Seconds()
96+
}
97+
98+
evalContext["d_memory_"+psiType+"_"+span] = diff
9699
evalContext["memory_"+psiType+"_"+span] = value.Float64()
100+
psi["memory_"+psiType+"_"+span] = value.Float64()
97101
}
98102
}
99103

internal/app/machined/pkg/controllers/runtime/internal/oom/oom_test.go

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -125,29 +125,45 @@ func TestPopulatePsiToCtx(t *testing.T) {
125125
dir: "./testdata/trigger-false",
126126
expectErr: "",
127127
expect: map[string]any{
128-
"memory_full_avg10": 2.4,
129-
"memory_full_avg300": 1.71,
130-
"memory_full_avg60": 5.16,
131-
"memory_full_total": 1.0654831e+07,
132-
"memory_some_avg10": 2.82,
133-
"memory_some_avg300": 1.97,
134-
"memory_some_avg60": 5.95,
135-
"memory_some_total": 1.217234e+07,
128+
"memory_full_avg10": 2.4,
129+
"memory_full_avg300": 1.71,
130+
"memory_full_avg60": 5.16,
131+
"memory_full_total": 1.0654831e+07,
132+
"memory_some_avg10": 2.82,
133+
"memory_some_avg300": 1.97,
134+
"memory_some_avg60": 5.95,
135+
"memory_some_total": 1.217234e+07,
136+
"d_memory_full_avg10": 0.0,
137+
"d_memory_full_avg300": 0.0,
138+
"d_memory_full_avg60": 0.0,
139+
"d_memory_full_total": 0.0,
140+
"d_memory_some_avg10": 0.0,
141+
"d_memory_some_avg300": 0.0,
142+
"d_memory_some_avg60": 0.0,
143+
"d_memory_some_total": 0.0,
136144
},
137145
},
138146
{
139147
name: "true",
140148
dir: "./testdata/trigger-true",
141149
expectErr: "",
142150
expect: map[string]any{
143-
"memory_full_avg10": 14.54,
144-
"memory_full_avg60": 6.97,
145-
"memory_full_avg300": 1.82,
146-
"memory_full_total": 1.0654831e+07,
147-
"memory_some_avg10": 17.06,
148-
"memory_some_avg60": 8.04,
149-
"memory_some_avg300": 2.1,
150-
"memory_some_total": 1.217234e+07,
151+
"memory_full_avg10": 14.54,
152+
"memory_full_avg60": 6.97,
153+
"memory_full_avg300": 1.82,
154+
"memory_full_total": 1.0654831e+07,
155+
"memory_some_avg10": 17.06,
156+
"memory_some_avg60": 8.04,
157+
"memory_some_avg300": 2.1,
158+
"memory_some_total": 1.217234e+07,
159+
"d_memory_full_avg10": 0.0,
160+
"d_memory_full_avg300": 0.0,
161+
"d_memory_full_avg60": 0.0,
162+
"d_memory_full_total": 0.0,
163+
"d_memory_some_avg10": 0.0,
164+
"d_memory_some_avg300": 0.0,
165+
"d_memory_some_avg60": 0.0,
166+
"d_memory_some_total": 0.0,
151167
},
152168
},
153169
} {
@@ -156,7 +172,7 @@ func TestPopulatePsiToCtx(t *testing.T) {
156172

157173
ctx := map[string]any{}
158174

159-
err := oom.PopulatePsiToCtx(test.dir, ctx)
175+
err := oom.PopulatePsiToCtx(test.dir, ctx, make(map[string]float64), 0)
160176

161177
if test.expectErr == "" {
162178
require.NoError(t, err)
@@ -192,7 +208,7 @@ func TestEvaluateTrigger(t *testing.T) {
192208
},
193209
triggerExpr: triggerExpr1,
194210
expect: false,
195-
expectErr: "cannot populate PSI context: cannot read memory pressure: error opening cgroupfs file open testdata/empty/memory.pressure: no such file or directory",
211+
expectErr: "cannot read memory pressure: error opening cgroupfs file open testdata/empty/memory.pressure: no such file or directory",
196212
},
197213
{
198214
name: "cgroup-false",
@@ -241,12 +257,25 @@ func TestEvaluateTrigger(t *testing.T) {
241257
t.Run(test.name, func(t *testing.T) {
242258
t.Parallel()
243259

244-
trigger, err := oom.EvaluateTrigger(test.triggerExpr, test.ctx, test.dir)
245-
246-
assert.Equal(t, test.expect, trigger)
260+
err := oom.PopulatePsiToCtx(test.dir, test.ctx, map[string]float64{
261+
"memory_full_avg10": 0,
262+
"memory_full_avg300": 0,
263+
"memory_full_avg60": 0,
264+
"memory_full_total": 0,
265+
"memory_some_avg10": 0,
266+
"memory_some_avg300": 0,
267+
"memory_some_avg60": 0,
268+
"memory_some_total": 0,
269+
}, 0)
247270

248271
if test.expectErr == "" {
249272
require.NoError(t, err)
273+
274+
trigger, err := oom.EvaluateTrigger(test.triggerExpr, test.ctx)
275+
276+
assert.Equal(t, test.expect, trigger)
277+
278+
require.NoError(t, err)
250279
} else {
251280
assert.ErrorContains(t, err, test.expectErr)
252281
}

internal/app/machined/pkg/controllers/runtime/oom.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type OOMController struct {
4646
V1Alpha1Mode runtime.Mode
4747
actionLog []actionLogItem
4848
idSeq int
49+
psi map[string]float64
4950
}
5051

5152
// Name implements controller.Controller interface.
@@ -117,6 +118,7 @@ func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger
117118
triggerExpr := defaultTriggerExpr()
118119
scoringExpr := defaultScoringExpr()
119120
sampleInterval := defaultSampleInterval
121+
ctrl.psi = make(map[string]float64)
120122

121123
ticker := time.NewTicker(sampleInterval)
122124
tickerC := ticker.C
@@ -150,7 +152,14 @@ func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger
150152
"time_since_trigger": time.Since(ctrl.ActionTriggered),
151153
}
152154

153-
trigger, err := oom.EvaluateTrigger(triggerExpr, evalContext, ctrl.CgroupRoot)
155+
err := oom.PopulatePsiToCtx(ctrl.CgroupRoot, evalContext, ctrl.psi, sampleInterval)
156+
if err != nil {
157+
logger.Error("cannot populate PSI context", zap.Error(err))
158+
159+
continue
160+
}
161+
162+
trigger, err := oom.EvaluateTrigger(triggerExpr, evalContext)
154163
if err != nil {
155164
logger.Error("cannot evaluate OOM trigger expression", zap.Error(err))
156165

internal/integration/base/base.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ type TalosSuite struct {
6262
CSITestTimeout string
6363
// Airgapped marks that cluster has no access to external networks
6464
Airgapped bool
65+
// Race informs test suites about race detector being enabled (e.g. for skipping incompatible tests)
66+
Race bool
6567

6668
discoveredNodes cluster.Info
6769
}

internal/integration/integration_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ var (
4141
extensionsNvidia bool
4242
verifyUKIBooted bool
4343
airgapped bool
44+
race bool
4445

4546
talosConfig string
4647
endpoint string
@@ -118,6 +119,7 @@ func TestIntegration(t *testing.T) {
118119
CSITestName: csiTestName,
119120
CSITestTimeout: csiTestTimeout,
120121
Airgapped: airgapped,
122+
Race: race,
121123
})
122124
}
123125

@@ -151,6 +153,7 @@ func init() {
151153
flag.BoolVar(&selinuxEnforcing, "talos.enforcing", false, "enable tests for SELinux enforcing mode")
152154
flag.BoolVar(&extensionsQEMU, "talos.extensions.qemu", false, "enable tests for qemu extensions")
153155
flag.BoolVar(&extensionsNvidia, "talos.extensions.nvidia", false, "enable tests for nvidia extensions")
156+
flag.BoolVar(&race, "talos.race", false, "skip tests that are incompatible with race detector")
154157
flag.BoolVar(&verifyUKIBooted, "talos.verifyukibooted", true, "enable tests for verifying that Talos was booted using a UKI")
155158

156159
flag.StringVar(

internal/integration/k8s/oom.go

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/dustin/go-humanize"
1818
"github.com/stretchr/testify/require"
1919
"go.yaml.in/yaml/v4"
20+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2021

2122
"github.com/siderolabs/talos/internal/integration/base"
2223
"github.com/siderolabs/talos/pkg/machinery/client"
@@ -39,8 +40,6 @@ func (suite *OomSuite) SuiteName() string {
3940

4041
// TestOom verifies that system remains stable after handling an OOM event.
4142
func (suite *OomSuite) TestOom() {
42-
suite.T().Skip("skip the test until https://github.com/siderolabs/talos/issues/12077 is resolved")
43-
4443
if suite.Cluster == nil {
4544
suite.T().Skip("without full cluster state reaching out to the node IP is not reliable")
4645
}
@@ -49,6 +48,10 @@ func (suite *OomSuite) TestOom() {
4948
suite.T().Skip("skipping in short mode")
5049
}
5150

51+
if suite.Race {
52+
suite.T().Skip("skipping as OOM tests are incompatible with race detector")
53+
}
54+
5255
if suite.Cluster.Provisioner() != base.ProvisionerQEMU {
5356
suite.T().Skip("skipping OOM test since provisioner is not qemu")
5457
}
@@ -59,10 +62,33 @@ func (suite *OomSuite) TestOom() {
5962
oomPodManifest := suite.ParseManifests(oomPodSpec)
6063

6164
suite.T().Cleanup(func() {
62-
cleanUpCtx, cleanupCancel := context.WithTimeout(context.Background(), time.Minute)
65+
cleanUpCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Minute)
6366
defer cleanupCancel()
6467

6568
suite.DeleteManifests(cleanUpCtx, oomPodManifest)
69+
70+
ticker := time.NewTicker(time.Second)
71+
done := cleanUpCtx.Done()
72+
73+
// Wait for all stress-mem pods to complete terminating
74+
for {
75+
select {
76+
case <-ticker.C:
77+
pods, err := suite.Clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
78+
LabelSelector: "app=stress-mem",
79+
})
80+
81+
suite.Require().NoError(err)
82+
83+
if len(pods.Items) == 0 {
84+
return
85+
}
86+
case <-done:
87+
suite.Require().Fail("Timed out waiting for cleanup")
88+
89+
return
90+
}
91+
}
6692
})
6793

6894
suite.ApplyManifests(ctx, oomPodManifest)
@@ -77,7 +103,7 @@ func (suite *OomSuite) TestOom() {
77103
suite.Require().NoError(err)
78104

79105
memoryBytes := memInfo.GetMessages()[0].GetMeminfo().GetMemtotal() * 1024
80-
numReplicas := int((memoryBytes/1024/1024+2048-1)/2048) * numWorkers * 15
106+
numReplicas := int((memoryBytes/1024/1024+2048-1)/2048) * numWorkers * 25
81107

82108
suite.T().Logf("detected memory: %s, workers %d => scaling to %d replicas",
83109
humanize.IBytes(memoryBytes), numWorkers, numReplicas)
@@ -86,12 +112,16 @@ func (suite *OomSuite) TestOom() {
86112
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", patchToReplicas(suite.T(), numReplicas))
87113

88114
// Expect at least one OOM kill of stress-ng within 15 seconds
89-
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng"))
115+
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng", 1))
90116

91117
// Scale to 1, wait for deployment to scale down, proving system is operational
92118
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", patchToReplicas(suite.T(), 1))
93119
suite.Require().NoError(suite.WaitForDeploymentAvailable(ctx, time.Minute, "default", "stress-mem", 1))
94120

121+
// Monitor OOM kills for 15 seconds and make sure no kills other than stress-ng happen
122+
// Allow 0 as well: ideally that'd be the case, but fail on anything not containing stress-ng
123+
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng", 0))
124+
95125
suite.APISuite.AssertClusterHealthy(ctx)
96126
}
97127

@@ -111,7 +141,7 @@ func patchToReplicas(t *testing.T, replicas int) []byte {
111141
// Waits for a period of time and return returns whether or not OOM events containing a specified process have been observed.
112142
//
113143
//nolint:gocyclo
114-
func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, timeout time.Duration, substr string) bool {
144+
func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, timeout time.Duration, substr string, n int) bool {
115145
startTime := time.Now()
116146

117147
watchCh := make(chan state.Event)
@@ -135,9 +165,9 @@ func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, time
135165
case <-timeoutCh:
136166
suite.T().Logf("observed %d OOM events containing process substring %q", numOOMObserved, substr)
137167

138-
return numOOMObserved > 0
168+
return numOOMObserved >= n
139169
case <-timeToObserveCh:
140-
if numOOMObserved > 0 {
170+
if numOOMObserved >= n {
141171
// if we already observed some OOM events, consider it a success
142172
suite.T().Logf("observed %d OOM events containing process substring %q", numOOMObserved, substr)
143173

@@ -150,10 +180,27 @@ func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, time
150180

151181
res := ev.Resource.(*runtime.OOMAction).TypedSpec()
152182

183+
bailOut := false
184+
153185
for _, proc := range res.Processes {
154186
if strings.Contains(proc, substr) {
155187
numOOMObserved++
188+
189+
break
156190
}
191+
192+
// Sometimes OOM catches containers in restart phase (while the
193+
// cgroup has previously accumulated OOM score).
194+
// Consider an OOM event wrong if something other than that is found.
195+
if !strings.Contains(proc, "runc init") && !strings.Contains(proc, "/pause") && proc != "" {
196+
bailOut = true
197+
}
198+
}
199+
200+
if bailOut {
201+
suite.T().Logf("observed an OOM event not containing process substring %q: %q (%d containing)", substr, res.Processes, numOOMObserved)
202+
203+
return false
157204
}
158205
}
159206
}

0 commit comments

Comments
 (0)