From 7f800563325f063ecc86988dd00d56a75b943617 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Fri, 31 Oct 2025 22:24:37 +0400 Subject: [PATCH 1/2] feat: implement logs persistence This is a stub which wires logging manager with a controller/volumes, not a real implementation. Fixes #11461 Signed-off-by: Andrey Smirnov --- .../pkg/controllers/block/volume_config.go | 2 +- .../controllers/block/volume_config_test.go | 2 +- .../controllers/runtime/log_persistence.go | 157 ++++++++++++++++++ internal/app/machined/pkg/runtime/logging.go | 13 ++ .../machined/pkg/runtime/logging/circular.go | 48 ++++++ .../app/machined/pkg/runtime/logging/file.go | 3 + .../app/machined/pkg/runtime/logging/null.go | 3 + .../runtime/v1alpha2/v1alpha2_controller.go | 3 + .../machined/pkg/system/services/kubelet.go | 2 +- internal/integration/api/selinux.go | 2 +- pkg/machinery/constants/constants.go | 3 + 11 files changed, 234 insertions(+), 4 deletions(-) create mode 100644 internal/app/machined/pkg/controllers/runtime/log_persistence.go diff --git a/internal/app/machined/pkg/controllers/block/volume_config.go b/internal/app/machined/pkg/controllers/block/volume_config.go index 3dc6633510f..4af78de6694 100644 --- a/internal/app/machined/pkg/controllers/block/volume_config.go +++ b/internal/app/machined/pkg/controllers/block/volume_config.go @@ -464,7 +464,7 @@ func (ctrl *VolumeConfigController) manageStandardVolumes(ctx context.Context, r }{ // /var/log { - Path: "/var/log", + Path: constants.LogMountPoint, Mode: 0o755, SELinuxLabel: "system_u:object_r:var_log_t:s0", }, diff --git a/internal/app/machined/pkg/controllers/block/volume_config_test.go b/internal/app/machined/pkg/controllers/block/volume_config_test.go index 49c607a7cc7..96d86b79aa9 100644 --- a/internal/app/machined/pkg/controllers/block/volume_config_test.go +++ b/internal/app/machined/pkg/controllers/block/volume_config_test.go @@ -157,7 +157,7 @@ func (suite *VolumeConfigSuite) TestReconcileDefaults() { }) ctest.AssertResources(suite, []resource.ID{ - "/var/log", + constants.LogMountPoint, "/var/log/audit", "/var/log/containers", "/var/log/pods", diff --git a/internal/app/machined/pkg/controllers/runtime/log_persistence.go b/internal/app/machined/pkg/controllers/runtime/log_persistence.go new file mode 100644 index 00000000000..7ca60653bac --- /dev/null +++ b/internal/app/machined/pkg/controllers/runtime/log_persistence.go @@ -0,0 +1,157 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package runtime + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync/atomic" + + "github.com/cosi-project/runtime/pkg/controller" + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/safe" + "github.com/cosi-project/runtime/pkg/state" + "go.uber.org/zap" + + "github.com/siderolabs/talos/internal/app/machined/pkg/runtime" + "github.com/siderolabs/talos/pkg/machinery/constants" + "github.com/siderolabs/talos/pkg/machinery/resources/block" +) + +// LogPersistenceController is a controller that persists logs in files. +type LogPersistenceController struct { + V1Alpha1Logging runtime.LoggingManager + + // dummy implementation + canLog atomic.Bool +} + +// Name implements controller.Controller interface. +func (ctrl *LogPersistenceController) Name() string { + return "runtime.LogPersistenceController" +} + +// Inputs implements controller.Controller interface. +func (ctrl *LogPersistenceController) Inputs() []controller.Input { + return []controller.Input{ + { + Namespace: block.NamespaceName, + Type: block.VolumeMountStatusType, + Kind: controller.InputStrong, + }, + { + Namespace: block.NamespaceName, + Type: block.VolumeMountRequestType, + Kind: controller.InputDestroyReady, + }, + } +} + +// Outputs implements controller.Controller interface. +func (ctrl *LogPersistenceController) Outputs() []controller.Output { + return []controller.Output{ + { + Type: block.VolumeMountRequestType, + Kind: controller.OutputShared, + }, + } +} + +func (ctrl *LogPersistenceController) WriteLog(id string, line []byte) error { + if !ctrl.canLog.Load() { + // logging is not enabled, drop the log line + return nil + } + + f, err := os.OpenFile(filepath.Join(constants.LogMountPoint, id+".log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("error opening log file for %q: %w", id, err) + } + + if _, err = f.Write(append(line, '\n')); err != nil { + f.Close() + + return fmt.Errorf("error writing log line for %q: %w", id, err) + } + + if err = f.Close(); err != nil { + return fmt.Errorf("error closing log file for %q: %w", id, err) + } + + return nil +} + +func (ctrl *LogPersistenceController) startLogging() { + // [TODO]: here we can start logging activities + ctrl.canLog.Store(true) +} + +func (ctrl *LogPersistenceController) stopLogging() { + // [TODO]: here we should stop all logging activities, close files, flush buffers, etc. + // after this call we should not hold /var/log + ctrl.canLog.Store(false) +} + +// Run implements controller.Controller interface. +// +//nolint:gocyclo +func (ctrl *LogPersistenceController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error { + ctrl.V1Alpha1Logging.SetLineWriter(ctrl) + + for { + select { + case <-ctx.Done(): + return nil + case <-r.EventCh(): + } + + requestID := ctrl.Name() + "-" + constants.LogMountPoint + + // create a volume mount request for the logs volume mount point + // to keep it alive and prevent it from being torn down + if err := safe.WriterModify(ctx, r, + block.NewVolumeMountRequest(block.NamespaceName, requestID), + func(v *block.VolumeMountRequest) error { + v.TypedSpec().Requester = ctrl.Name() + v.TypedSpec().VolumeID = constants.LogMountPoint + + return nil + }, + ); err != nil { + return fmt.Errorf("error creating volume mount request for user volume mount point: %w", err) + } + + vms, err := safe.ReaderGetByID[*block.VolumeMountStatus](ctx, r, requestID) + if err != nil { + if state.IsNotFoundError(err) { + // volume mount not ready yet, wait more + continue + } + + return fmt.Errorf("error getting volume mount status for log volume: %w", err) + } + + switch vms.Metadata().Phase() { + case resource.PhaseRunning: + if !vms.Metadata().Finalizers().Has(ctrl.Name()) { + if err = r.AddFinalizer(ctx, vms.Metadata(), ctrl.Name()); err != nil { + return fmt.Errorf("error adding finalizer to volume mount status for log volume: %w", err) + } + + ctrl.startLogging() + } + case resource.PhaseTearingDown: + if vms.Metadata().Finalizers().Has(ctrl.Name()) { + ctrl.stopLogging() + + if err = r.RemoveFinalizer(ctx, vms.Metadata(), ctrl.Name()); err != nil { + return fmt.Errorf("error removing finalizer from volume mount status for log volume: %w", err) + } + } + } + } +} diff --git a/internal/app/machined/pkg/runtime/logging.go b/internal/app/machined/pkg/runtime/logging.go index fcac735ed73..044ec537ac2 100644 --- a/internal/app/machined/pkg/runtime/logging.go +++ b/internal/app/machined/pkg/runtime/logging.go @@ -24,6 +24,11 @@ type LoggingManager interface { // SetSenders should be thread-safe. SetSenders(senders []LogSender) []LogSender + // SetLineWriter sets a writer that will receive raw log lines. + // + // SetLineWriter can be only singularly called, subsequent calls override previous writer. + SetLineWriter(w LogWriter) + // RegisteredLogs returns a list of registered logs containers. RegisteredLogs() []string } @@ -88,3 +93,11 @@ type LogSender interface { // Close should be thread-safe. Close(ctx context.Context) error } + +// LogWriter provider common interface for text-based log writers. +type LogWriter interface { + // WriteLog writes a single log line. + // + // WriteLog should be thread-safe. + WriteLog(id string, line []byte) error +} diff --git a/internal/app/machined/pkg/runtime/logging/circular.go b/internal/app/machined/pkg/runtime/logging/circular.go index 6bf24040b9f..4c0ce14ef46 100644 --- a/internal/app/machined/pkg/runtime/logging/circular.go +++ b/internal/app/machined/pkg/runtime/logging/circular.go @@ -49,6 +49,8 @@ type CircularBufferLoggingManager struct { sendersRW sync.RWMutex senders []runtime.LogSender sendersChanged chan struct{} + + lineWriter chan runtime.LogWriter } // NewCircularBufferLoggingManager initializes new CircularBufferLoggingManager. @@ -66,6 +68,7 @@ func NewCircularBufferLoggingManager(fallbackLogger *log.Logger) *CircularBuffer fallbackLogger: fallbackLogger, sendersChanged: make(chan struct{}), compressor: compressor, + lineWriter: make(chan runtime.LogWriter, 1), } } @@ -98,6 +101,16 @@ func (manager *CircularBufferLoggingManager) SetSenders(senders []runtime.LogSen return prevSenders } +// SetLineWriter implements runtime.LoggingManager interface. +func (manager *CircularBufferLoggingManager) SetLineWriter(w runtime.LogWriter) { + select { + case manager.lineWriter <- w: + default: + <-manager.lineWriter + manager.lineWriter <- w + } +} + // getSenders waits for senders to be set and returns them. func (manager *CircularBufferLoggingManager) getSenders() []runtime.LogSender { for { @@ -193,6 +206,18 @@ func (handler *circularHandler) Writer() (io.WriteCloser, error) { handler.manager.fallbackLogger.Printf("log senders stopped: %s", err) } }() + + go func() { + defer func() { + if r := recover(); r != nil { + handler.manager.fallbackLogger.Printf("log writer panic: %v", r) + } + }() + + if err := handler.runLineWriter(); err != nil { + handler.manager.fallbackLogger.Printf("log writer stopped: %s", err) + } + }() } } @@ -325,6 +350,29 @@ func (handler *circularHandler) resend(e *runtime.LogEvent) { } } +func (handler *circularHandler) runLineWriter() error { + r, err := handler.Reader(runtime.WithFollow()) + if err != nil { + return err + } + defer r.Close() //nolint:errcheck + + w := <-handler.manager.lineWriter + handler.manager.lineWriter <- w + + scanner := bufio.NewScanner(r) + for scanner.Scan() { + l := scanner.Bytes() + + err = w.WriteLog(handler.id, l) + if err != nil { + return fmt.Errorf("line writer: %w", err) + } + } + + return fmt.Errorf("scanner: %w", scanner.Err()) +} + // timeStampWriter is a writer that adds a timestamp to each line. type timeStampWriter struct { w io.WriteCloser diff --git a/internal/app/machined/pkg/runtime/logging/file.go b/internal/app/machined/pkg/runtime/logging/file.go index 8b0afb1d61d..856eb63b131 100644 --- a/internal/app/machined/pkg/runtime/logging/file.go +++ b/internal/app/machined/pkg/runtime/logging/file.go @@ -48,6 +48,9 @@ func (manager *FileLoggingManager) SetSenders([]runtime.LogSender) []runtime.Log return nil } +// SetLineWriter implements runtime.LoggingManager interface (by doing nothing). +func (manager *FileLoggingManager) SetLineWriter(runtime.LogWriter) {} + // RegisteredLogs implements runtime.LoggingManager interface. func (manager *FileLoggingManager) RegisteredLogs() []string { var result []string diff --git a/internal/app/machined/pkg/runtime/logging/null.go b/internal/app/machined/pkg/runtime/logging/null.go index 6e32b60636a..e15a98f0621 100644 --- a/internal/app/machined/pkg/runtime/logging/null.go +++ b/internal/app/machined/pkg/runtime/logging/null.go @@ -29,6 +29,9 @@ func (*NullLoggingManager) SetSenders([]runtime.LogSender) []runtime.LogSender { return nil } +// SetLineWriter implements runtime.LoggingManager interface (by doing nothing). +func (*NullLoggingManager) SetLineWriter(runtime.LogWriter) {} + // RegisteredLogs implements runtime.LoggingManager interface (by doing nothing). func (*NullLoggingManager) RegisteredLogs() []string { return nil diff --git a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go index acfd50640ed..30d369e0b5f 100644 --- a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go +++ b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go @@ -424,6 +424,9 @@ func (ctrl *Controller) Run(ctx context.Context, drainer *runtime.Drainer) error &runtimecontrollers.KmsgLogDeliveryController{ Drainer: drainer, }, + &runtimecontrollers.LogPersistenceController{ + V1Alpha1Logging: ctrl.v1alpha1Runtime.Logging(), + }, &runtimecontrollers.LoadedKernelModuleController{ V1Alpha1Mode: ctrl.v1alpha1Runtime.State().Platform().Mode(), }, diff --git a/internal/app/machined/pkg/system/services/kubelet.go b/internal/app/machined/pkg/system/services/kubelet.go index 99d51d11738..1c5d5bb1097 100644 --- a/internal/app/machined/pkg/system/services/kubelet.go +++ b/internal/app/machined/pkg/system/services/kubelet.go @@ -110,7 +110,7 @@ func (k *Kubelet) Volumes(runtime.Runtime) []string { return []string{ "/var/lib", "/var/lib/kubelet", - "/var/log", + constants.LogMountPoint, "/var/log/audit", "/var/log/containers", "/var/log/pods", diff --git a/internal/integration/api/selinux.go b/internal/integration/api/selinux.go index f62a03f021f..beb71c1e8d8 100644 --- a/internal/integration/api/selinux.go +++ b/internal/integration/api/selinux.go @@ -111,7 +111,7 @@ func (suite *SELinuxSuite) TestFileMountLabels() { "/var/lib/cni": "system_u:object_r:cni_state_t:s0", "/var/lib/kubelet": "system_u:object_r:kubelet_state_t:s0", "/var/lib/kubelet/seccomp": "system_u:object_r:seccomp_profile_t:s0", - "/var/log": "system_u:object_r:var_log_t:s0", + constants.LogMountPoint: "system_u:object_r:var_log_t:s0", "/var/log/audit": "system_u:object_r:audit_log_t:s0", constants.KubernetesAuditLogDir: "system_u:object_r:kube_log_t:s0", "/var/log/containers": "system_u:object_r:containers_log_t:s0", diff --git a/pkg/machinery/constants/constants.go b/pkg/machinery/constants/constants.go index ba51532bd69..067da4f4185 100644 --- a/pkg/machinery/constants/constants.go +++ b/pkg/machinery/constants/constants.go @@ -1275,6 +1275,9 @@ const ( // UserVolumeMountPoint is the path to the volume mount point for the user volumes. UserVolumeMountPoint = "/var/mnt" + // LogMountPoint is the path to the logs mount point, and ID of the logs volume. + LogMountPoint = "/var/log" + // UserVolumePrefix is the prefix for the user volumes. UserVolumePrefix = "u-" From b15504c966aea09bc3102186a87c08ad0bd808b4 Mon Sep 17 00:00:00 2001 From: Dmitrii Sharshakov Date: Thu, 13 Nov 2025 18:28:33 +0100 Subject: [PATCH 2/2] wip --- .../controllers/runtime/log_persistence.go | 76 ++++++++++++++----- 1 file changed, 55 insertions(+), 21 deletions(-) diff --git a/internal/app/machined/pkg/controllers/runtime/log_persistence.go b/internal/app/machined/pkg/controllers/runtime/log_persistence.go index 7ca60653bac..ff42f30b484 100644 --- a/internal/app/machined/pkg/controllers/runtime/log_persistence.go +++ b/internal/app/machined/pkg/controllers/runtime/log_persistence.go @@ -9,7 +9,7 @@ import ( "fmt" "os" "path/filepath" - "sync/atomic" + "sync" "github.com/cosi-project/runtime/pkg/controller" "github.com/cosi-project/runtime/pkg/resource" @@ -26,8 +26,10 @@ import ( type LogPersistenceController struct { V1Alpha1Logging runtime.LoggingManager - // dummy implementation - canLog atomic.Bool + // RLocked by the log writers, Locked by volume handlers + canLog sync.RWMutex + filesMutex sync.Mutex + files map[string]*os.File } // Name implements controller.Controller interface. @@ -62,38 +64,63 @@ func (ctrl *LogPersistenceController) Outputs() []controller.Output { } func (ctrl *LogPersistenceController) WriteLog(id string, line []byte) error { - if !ctrl.canLog.Load() { - // logging is not enabled, drop the log line - return nil - } + var err error + + ctrl.canLog.RLock() + defer ctrl.canLog.RUnlock() - f, err := os.OpenFile(filepath.Join(constants.LogMountPoint, id+".log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) - if err != nil { - return fmt.Errorf("error opening log file for %q: %w", id, err) + f, ok := ctrl.files[id] + if !ok { + fmt.Println("LOGGING open", id) + f, err = os.OpenFile(filepath.Join(constants.LogMountPoint, id+".log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + fmt.Println("LOGGING open err", err) + return fmt.Errorf("error opening log file for %q: %w", id, err) + } + + fmt.Println("LOGGING map lock", id) + ctrl.filesMutex.Lock() + ctrl.files[id] = f + fmt.Println("LOGGING map unlock", id) + ctrl.filesMutex.Unlock() } + // fmt.Println("LOGGING write", id) if _, err = f.Write(append(line, '\n')); err != nil { - f.Close() + fmt.Println("LOGGING err", err) return fmt.Errorf("error writing log line for %q: %w", id, err) } - if err = f.Close(); err != nil { - return fmt.Errorf("error closing log file for %q: %w", id, err) - } - return nil } func (ctrl *LogPersistenceController) startLogging() { - // [TODO]: here we can start logging activities - ctrl.canLog.Store(true) + // here we can start logging activities + fmt.Println("LOGGING ctrl.canLog.Unlock") + ctrl.canLog.Unlock() } -func (ctrl *LogPersistenceController) stopLogging() { - // [TODO]: here we should stop all logging activities, close files, flush buffers, etc. +func (ctrl *LogPersistenceController) stopLogging() error { + // Stop all logging activities, close files // after this call we should not hold /var/log - ctrl.canLog.Store(false) + fmt.Println("LOGGING stop", &ctrl.canLog) + ctrl.canLog.Lock() + fmt.Println("LOGGING stop, canLog locked") + ctrl.filesMutex.Lock() + fmt.Println("LOGGING stop, filesMutex locked") + defer ctrl.filesMutex.Unlock() + + for id := range ctrl.files { + fmt.Println("LOGGING close", id) + if err := ctrl.files[id].Close(); err != nil { + fmt.Println("LOGGING close err", err) + return fmt.Errorf("error closing log file for %q: %w", id, err) + } + delete(ctrl.files, id) + } + + return nil } // Run implements controller.Controller interface. @@ -102,6 +129,11 @@ func (ctrl *LogPersistenceController) stopLogging() { func (ctrl *LogPersistenceController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error { ctrl.V1Alpha1Logging.SetLineWriter(ctrl) + ctrl.files = make(map[string]*os.File) + // Block writes until /var/log is ready + ctrl.canLog.Lock() + fmt.Println("LOGGING ctrl.canLog.Lock") + for { select { case <-ctx.Done(): @@ -146,7 +178,9 @@ func (ctrl *LogPersistenceController) Run(ctx context.Context, r controller.Runt } case resource.PhaseTearingDown: if vms.Metadata().Finalizers().Has(ctrl.Name()) { - ctrl.stopLogging() + if err = ctrl.stopLogging(); err != nil { + return fmt.Errorf("error stopping persistent logging: %w", err) + } if err = r.RemoveFinalizer(ctx, vms.Metadata(), ctrl.Name()); err != nil { return fmt.Errorf("error removing finalizer from volume mount status for log volume: %w", err)