Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2025-10-23T15:10:44Z by kres 46e133d.
# Generated on 2025-10-31T17:08:03Z by kres cd5a938.

concurrency:
group: ${{ github.head_ref || github.run_id }}
Expand Down Expand Up @@ -4670,6 +4670,7 @@ jobs:
make initramfs installer-base imager installer
- name: e2e-qemu-race
env:
EXTRA_TEST_ARGS: -talos.race
GITHUB_STEP_NAME: ${{ github.job}}-e2e-qemu-race
IMAGE_REGISTRY: registry.dev.siderolabs.io
QEMU_EXTRA_DISKS: "3"
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/integration-qemu-race-cron.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2025-09-19T11:03:20Z by kres 065ec4c.
# Generated on 2025-10-31T17:08:03Z by kres cd5a938.

concurrency:
group: ${{ github.head_ref || github.run_id }}
Expand Down Expand Up @@ -88,6 +88,7 @@ jobs:
make initramfs installer-base imager installer
- name: e2e-qemu-race
env:
EXTRA_TEST_ARGS: -talos.race
GITHUB_STEP_NAME: ${{ github.job}}-e2e-qemu-race
IMAGE_REGISTRY: registry.dev.siderolabs.io
QEMU_EXTRA_DISKS: "3"
Expand Down
1 change: 1 addition & 0 deletions .kres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,7 @@ spec:
WITH_CONFIG_PATCH_WORKER: "@hack/test/patches/ephemeral-nvme.yaml:@hack/test/patches/dm-raid-module.yaml"
QEMU_MEMORY_CONTROLPLANES: 4096 # race-enabled Talos consumes lots of RAM
QEMU_MEMORY_WORKERS: 4096
EXTRA_TEST_ARGS: "-talos.race"
TAG_SUFFIX: -race
IMAGE_REGISTRY: registry.dev.siderolabs.io
- name: save-talos-logs
Expand Down
18 changes: 11 additions & 7 deletions internal/app/machined/pkg/controllers/runtime/internal/oom/oom.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io/fs"
"os"
"path/filepath"
"time"

"github.com/google/cel-go/common/types"
"go.uber.org/zap"
Expand Down Expand Up @@ -56,12 +57,7 @@ func (cgroup *RankedCgroup) CalculateScore(expr *cel.Expression) (float64, error

// EvaluateTrigger is a method obtaining data and evaluating the trigger expression.
// When the result is true, designated OOM action is to be executed.
func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any, cgroup string) (bool, error) {
err := PopulatePsiToCtx(cgroup, evalContext)
if err != nil {
return false, fmt.Errorf("cannot populate PSI context: %w", err)
}

func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any) (bool, error) {
trigger, err := triggerExpr.EvalBool(celenv.OOMTrigger(), evalContext)
if err != nil {
return false, fmt.Errorf("cannot evaluate expression: %w", err)
Expand All @@ -71,7 +67,7 @@ func EvaluateTrigger(triggerExpr cel.Expression, evalContext map[string]any, cgr
}

// PopulatePsiToCtx populates the context with PSI data from a cgroup.
func PopulatePsiToCtx(cgroup string, evalContext map[string]any) error {
func PopulatePsiToCtx(cgroup string, evalContext map[string]any, psi map[string]float64, sampleInterval time.Duration) error {
node, err := cgroups.GetCgroupProperty(cgroup, "memory.pressure")
if err != nil {
return fmt.Errorf("cannot read memory pressure: %w", err)
Expand All @@ -93,7 +89,15 @@ func PopulatePsiToCtx(cgroup string, evalContext map[string]any) error {
return fmt.Errorf("PSI is not defined")
}

diff := 0.

if oldValue, ok := psi["memory_"+psiType+"_"+span]; ok {
diff = (value.Float64() - oldValue) / sampleInterval.Seconds()
}

evalContext["d_memory_"+psiType+"_"+span] = diff
evalContext["memory_"+psiType+"_"+span] = value.Float64()
psi["memory_"+psiType+"_"+span] = value.Float64()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,45 @@ func TestPopulatePsiToCtx(t *testing.T) {
dir: "./testdata/trigger-false",
expectErr: "",
expect: map[string]any{
"memory_full_avg10": 2.4,
"memory_full_avg300": 1.71,
"memory_full_avg60": 5.16,
"memory_full_total": 1.0654831e+07,
"memory_some_avg10": 2.82,
"memory_some_avg300": 1.97,
"memory_some_avg60": 5.95,
"memory_some_total": 1.217234e+07,
"memory_full_avg10": 2.4,
"memory_full_avg300": 1.71,
"memory_full_avg60": 5.16,
"memory_full_total": 1.0654831e+07,
"memory_some_avg10": 2.82,
"memory_some_avg300": 1.97,
"memory_some_avg60": 5.95,
"memory_some_total": 1.217234e+07,
"d_memory_full_avg10": 0.0,
"d_memory_full_avg300": 0.0,
"d_memory_full_avg60": 0.0,
"d_memory_full_total": 0.0,
"d_memory_some_avg10": 0.0,
"d_memory_some_avg300": 0.0,
"d_memory_some_avg60": 0.0,
"d_memory_some_total": 0.0,
},
},
{
name: "true",
dir: "./testdata/trigger-true",
expectErr: "",
expect: map[string]any{
"memory_full_avg10": 14.54,
"memory_full_avg60": 6.97,
"memory_full_avg300": 1.82,
"memory_full_total": 1.0654831e+07,
"memory_some_avg10": 17.06,
"memory_some_avg60": 8.04,
"memory_some_avg300": 2.1,
"memory_some_total": 1.217234e+07,
"memory_full_avg10": 14.54,
"memory_full_avg60": 6.97,
"memory_full_avg300": 1.82,
"memory_full_total": 1.0654831e+07,
"memory_some_avg10": 17.06,
"memory_some_avg60": 8.04,
"memory_some_avg300": 2.1,
"memory_some_total": 1.217234e+07,
"d_memory_full_avg10": 0.0,
"d_memory_full_avg300": 0.0,
"d_memory_full_avg60": 0.0,
"d_memory_full_total": 0.0,
"d_memory_some_avg10": 0.0,
"d_memory_some_avg300": 0.0,
"d_memory_some_avg60": 0.0,
"d_memory_some_total": 0.0,
},
},
} {
Expand All @@ -156,7 +172,7 @@ func TestPopulatePsiToCtx(t *testing.T) {

ctx := map[string]any{}

err := oom.PopulatePsiToCtx(test.dir, ctx)
err := oom.PopulatePsiToCtx(test.dir, ctx, make(map[string]float64), 0)

if test.expectErr == "" {
require.NoError(t, err)
Expand Down Expand Up @@ -192,7 +208,7 @@ func TestEvaluateTrigger(t *testing.T) {
},
triggerExpr: triggerExpr1,
expect: false,
expectErr: "cannot populate PSI context: cannot read memory pressure: error opening cgroupfs file open testdata/empty/memory.pressure: no such file or directory",
expectErr: "cannot read memory pressure: error opening cgroupfs file open testdata/empty/memory.pressure: no such file or directory",
},
{
name: "cgroup-false",
Expand Down Expand Up @@ -241,12 +257,25 @@ func TestEvaluateTrigger(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

trigger, err := oom.EvaluateTrigger(test.triggerExpr, test.ctx, test.dir)

assert.Equal(t, test.expect, trigger)
err := oom.PopulatePsiToCtx(test.dir, test.ctx, map[string]float64{
"memory_full_avg10": 0,
"memory_full_avg300": 0,
"memory_full_avg60": 0,
"memory_full_total": 0,
"memory_some_avg10": 0,
"memory_some_avg300": 0,
"memory_some_avg60": 0,
"memory_some_total": 0,
}, 0)

if test.expectErr == "" {
require.NoError(t, err)

trigger, err := oom.EvaluateTrigger(test.triggerExpr, test.ctx)

assert.Equal(t, test.expect, trigger)

require.NoError(t, err)
} else {
assert.ErrorContains(t, err, test.expectErr)
}
Expand Down
11 changes: 10 additions & 1 deletion internal/app/machined/pkg/controllers/runtime/oom.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type OOMController struct {
V1Alpha1Mode runtime.Mode
actionLog []actionLogItem
idSeq int
psi map[string]float64
}

// Name implements controller.Controller interface.
Expand Down Expand Up @@ -117,6 +118,7 @@ func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger
triggerExpr := defaultTriggerExpr()
scoringExpr := defaultScoringExpr()
sampleInterval := defaultSampleInterval
ctrl.psi = make(map[string]float64)

ticker := time.NewTicker(sampleInterval)
tickerC := ticker.C
Expand Down Expand Up @@ -150,7 +152,14 @@ func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger
"time_since_trigger": time.Since(ctrl.ActionTriggered),
}

trigger, err := oom.EvaluateTrigger(triggerExpr, evalContext, ctrl.CgroupRoot)
err := oom.PopulatePsiToCtx(ctrl.CgroupRoot, evalContext, ctrl.psi, sampleInterval)
if err != nil {
logger.Error("cannot populate PSI context", zap.Error(err))

continue
}

trigger, err := oom.EvaluateTrigger(triggerExpr, evalContext)
if err != nil {
logger.Error("cannot evaluate OOM trigger expression", zap.Error(err))

Expand Down
2 changes: 2 additions & 0 deletions internal/integration/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type TalosSuite struct {
CSITestTimeout string
// Airgapped marks that cluster has no access to external networks
Airgapped bool
// Race informs test suites about race detector being enabled (e.g. for skipping incompatible tests)
Race bool

discoveredNodes cluster.Info
}
Expand Down
3 changes: 3 additions & 0 deletions internal/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
extensionsNvidia bool
verifyUKIBooted bool
airgapped bool
race bool

talosConfig string
endpoint string
Expand Down Expand Up @@ -118,6 +119,7 @@ func TestIntegration(t *testing.T) {
CSITestName: csiTestName,
CSITestTimeout: csiTestTimeout,
Airgapped: airgapped,
Race: race,
})
}

Expand Down Expand Up @@ -151,6 +153,7 @@ func init() {
flag.BoolVar(&selinuxEnforcing, "talos.enforcing", false, "enable tests for SELinux enforcing mode")
flag.BoolVar(&extensionsQEMU, "talos.extensions.qemu", false, "enable tests for qemu extensions")
flag.BoolVar(&extensionsNvidia, "talos.extensions.nvidia", false, "enable tests for nvidia extensions")
flag.BoolVar(&race, "talos.race", false, "skip tests that are incompatible with race detector")
flag.BoolVar(&verifyUKIBooted, "talos.verifyukibooted", true, "enable tests for verifying that Talos was booted using a UKI")

flag.StringVar(
Expand Down
63 changes: 55 additions & 8 deletions internal/integration/k8s/oom.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
"go.yaml.in/yaml/v4"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/siderolabs/talos/internal/integration/base"
"github.com/siderolabs/talos/pkg/machinery/client"
Expand All @@ -39,8 +40,6 @@ func (suite *OomSuite) SuiteName() string {

// TestOom verifies that system remains stable after handling an OOM event.
func (suite *OomSuite) TestOom() {
suite.T().Skip("skip the test until https://github.com/siderolabs/talos/issues/12077 is resolved")

if suite.Cluster == nil {
suite.T().Skip("without full cluster state reaching out to the node IP is not reliable")
}
Expand All @@ -49,6 +48,10 @@ func (suite *OomSuite) TestOom() {
suite.T().Skip("skipping in short mode")
}

if suite.Race {
suite.T().Skip("skipping as OOM tests are incompatible with race detector")
}

if suite.Cluster.Provisioner() != base.ProvisionerQEMU {
suite.T().Skip("skipping OOM test since provisioner is not qemu")
}
Expand All @@ -59,10 +62,33 @@ func (suite *OomSuite) TestOom() {
oomPodManifest := suite.ParseManifests(oomPodSpec)

suite.T().Cleanup(func() {
cleanUpCtx, cleanupCancel := context.WithTimeout(context.Background(), time.Minute)
cleanUpCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cleanupCancel()

suite.DeleteManifests(cleanUpCtx, oomPodManifest)

ticker := time.NewTicker(time.Second)
done := cleanUpCtx.Done()

// Wait for all stress-mem pods to complete terminating
for {
select {
case <-ticker.C:
pods, err := suite.Clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
LabelSelector: "app=stress-mem",
})

suite.Require().NoError(err)

if len(pods.Items) == 0 {
return
}
case <-done:
suite.Require().Fail("Timed out waiting for cleanup")

return
}
}
})

suite.ApplyManifests(ctx, oomPodManifest)
Expand All @@ -77,7 +103,7 @@ func (suite *OomSuite) TestOom() {
suite.Require().NoError(err)

memoryBytes := memInfo.GetMessages()[0].GetMeminfo().GetMemtotal() * 1024
numReplicas := int((memoryBytes/1024/1024+2048-1)/2048) * numWorkers * 15
numReplicas := int((memoryBytes/1024/1024+2048-1)/2048) * numWorkers * 25

suite.T().Logf("detected memory: %s, workers %d => scaling to %d replicas",
humanize.IBytes(memoryBytes), numWorkers, numReplicas)
Expand All @@ -86,12 +112,16 @@ func (suite *OomSuite) TestOom() {
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", patchToReplicas(suite.T(), numReplicas))

// Expect at least one OOM kill of stress-ng within 15 seconds
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng"))
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng", 1))

// Scale to 1, wait for deployment to scale down, proving system is operational
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", patchToReplicas(suite.T(), 1))
suite.Require().NoError(suite.WaitForDeploymentAvailable(ctx, time.Minute, "default", "stress-mem", 1))

// Monitor OOM kills for 15 seconds and make sure no kills other than stress-ng happen
// Allow 0 as well: ideally that'd be the case, but fail on anything not containing stress-ng
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, 2*time.Minute, "stress-ng", 0))

suite.APISuite.AssertClusterHealthy(ctx)
}

Expand All @@ -111,7 +141,7 @@ func patchToReplicas(t *testing.T, replicas int) []byte {
// Waits for a period of time and return returns whether or not OOM events containing a specified process have been observed.
//
//nolint:gocyclo
func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, timeout time.Duration, substr string) bool {
func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, timeout time.Duration, substr string, n int) bool {
startTime := time.Now()

watchCh := make(chan state.Event)
Expand All @@ -135,9 +165,9 @@ func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, time
case <-timeoutCh:
suite.T().Logf("observed %d OOM events containing process substring %q", numOOMObserved, substr)

return numOOMObserved > 0
return numOOMObserved >= n
case <-timeToObserveCh:
if numOOMObserved > 0 {
if numOOMObserved >= n {
// if we already observed some OOM events, consider it a success
suite.T().Logf("observed %d OOM events containing process substring %q", numOOMObserved, substr)

Expand All @@ -150,10 +180,27 @@ func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeToObserve, time

res := ev.Resource.(*runtime.OOMAction).TypedSpec()

bailOut := false

for _, proc := range res.Processes {
if strings.Contains(proc, substr) {
numOOMObserved++

break
}

// Sometimes OOM catches containers in restart phase (while the
// cgroup has previously accumulated OOM score).
// Consider an OOM event wrong if something other than that is found.
if !strings.Contains(proc, "runc init") && !strings.Contains(proc, "/pause") && proc != "" {
bailOut = true
}
}

if bailOut {
suite.T().Logf("observed an OOM event not containing process substring %q: %q (%d containing)", substr, res.Processes, numOOMObserved)

return false
}
}
}
Expand Down
Loading
Loading