diff --git a/.golangci.yml b/.golangci.yml index d830af46b8..4ff3588153 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -764,7 +764,7 @@ linters-settings: - github.com/jmoiron/sqlx sloglint: # Enforce not mixing key-value pairs and attributes. Default: true - no-mixed-args: true + no-mixed-args: false # Enforce using key-value pairs only (overrides no-mixed-args, incompatible with attr-only). Default: false kv-only: false # Enforce using attributes only (overrides no-mixed-args, incompatible with kv-only). Default: false diff --git a/cmd/agent/main.go b/cmd/agent/main.go index a3aafb948a..00773b8906 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -18,6 +18,8 @@ import ( "github.com/nginx/agent/v3/internal/config" ) +var logOrigin = slog.String("log_origin", "main.go") + var ( // set at buildtime commit = "" @@ -32,7 +34,7 @@ func main() { go func() { select { case <-sigChan: - slog.WarnContext(ctx, "NGINX Agent exiting") + slog.WarnContext(ctx, "NGINX Agent exiting", logOrigin) cancel() time.Sleep(config.DefGracefulShutdownPeriod) @@ -41,6 +43,7 @@ func main() { "Failed to gracefully shutdown within timeout of %v. Exiting", config.DefGracefulShutdownPeriod, ), + logOrigin, ) os.Exit(1) case <-ctx.Done(): @@ -51,6 +54,6 @@ func main() { err := app.Run(ctx) if err != nil { - slog.ErrorContext(ctx, "NGINX Agent exiting due to error", "error", err) + slog.ErrorContext(ctx, "NGINX Agent exiting due to error", "error", err, logOrigin) } } diff --git a/internal/app.go b/internal/app.go index 6f10aa1bb5..0db7db100f 100644 --- a/internal/app.go +++ b/internal/app.go @@ -15,6 +15,8 @@ import ( "github.com/spf13/cobra" ) +var logOrigin = slog.String("log_origin", "app.go") + const ( defaultMessagePipeChannelSize = 100 defaultQueueSize = 100 @@ -35,25 +37,26 @@ func (a *App) Run(ctx context.Context) error { config.RegisterRunner(func(_ *cobra.Command, _ []string) { err := config.RegisterConfigFile() if err != nil { - slog.ErrorContext(ctx, "Failed to load configuration file", "error", err) + slog.ErrorContext(ctx, "Failed to load configuration file", "error", err, logOrigin) return } agentConfig, err := config.ResolveConfig() if err != nil { - slog.ErrorContext(ctx, "Invalid config", "error", err) + slog.ErrorContext(ctx, "Invalid config", "error", err, logOrigin) return } slog.InfoContext(ctx, "Starting NGINX Agent", slog.String("version", a.version), slog.String("commit", a.commit), + logOrigin, ) messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize) err = messagePipe.Register(defaultQueueSize, plugin.LoadPlugins(ctx, agentConfig)) if err != nil { - slog.ErrorContext(ctx, "Failed to register plugins", "error", err) + slog.ErrorContext(ctx, "Failed to register plugins", "error", err, logOrigin) return } diff --git a/internal/bus/message_pipe.go b/internal/bus/message_pipe.go index 483b39456d..8ba63c1ede 100644 --- a/internal/bus/message_pipe.go +++ b/internal/bus/message_pipe.go @@ -13,6 +13,8 @@ import ( messagebus "github.com/vardius/message-bus" ) +var logOrigin = slog.String("log_origin", "message_pipe.go") + type ( Payload interface{} @@ -81,7 +83,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin) error { pluginsRegistered = append(pluginsRegistered, plugin.Info().Name) } - slog.Info("Finished registering plugins", "plugins", pluginsRegistered) + slog.Info("Finished registering plugins", "plugins", pluginsRegistered, logOrigin) return nil } @@ -195,7 +197,7 @@ func (p *MessagePipe) initPlugins(ctx context.Context) { for index, plugin := range p.plugins { err := plugin.Init(ctx, p) if err != nil { - slog.ErrorContext(ctx, "Failed to initialize plugin", "plugin", plugin.Info().Name, "error", err) + slog.ErrorContext(ctx, "Failed to initialize plugin", "plugin", plugin.Info().Name, "error", err, logOrigin) unsubscribeError := p.unsubscribePlugin(ctx, index, plugin) if unsubscribeError != nil { @@ -204,6 +206,7 @@ func (p *MessagePipe) initPlugins(ctx context.Context) { "Failed to unsubscribe plugin", "plugin", plugin.Info().Name, "error", unsubscribeError, + logOrigin, ) } } diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index 4956bb0c5a..f6077212bd 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -53,6 +53,8 @@ type ( } ) +var pluginLogOrigin = slog.String("log_origin", "otel_collector_plugin.go") + var ( _ bus.Plugin = (*Collector)(nil) initMutex = &sync.Mutex{} @@ -101,14 +103,16 @@ func (oc *Collector) GetState() otelcol.State { // Init initializes and starts the plugin func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) error { - slog.InfoContext(ctx, "Starting OTel Collector plugin") + slog.InfoContext(ctx, "Starting OTel Collector plugin", pluginLogOrigin) var runCtx context.Context runCtx, oc.cancel = context.WithCancel(ctx) if !oc.config.AreReceiversConfigured() { slog.InfoContext(runCtx, "No receivers configured for OTel Collector. "+ - "Waiting to discover a receiver before starting OTel collector.") + "Waiting to discover a receiver before starting OTel collector.", + pluginLogOrigin, + ) return nil } @@ -128,7 +132,7 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro bootErr := oc.bootup(runCtx) if bootErr != nil { - slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr) + slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr, pluginLogOrigin) } return nil @@ -138,7 +142,9 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) { for _, receiver := range receivers { if receiver.OtlpTLSConfig == nil { - slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.") + slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.", + pluginLogOrigin) + continue } @@ -146,21 +152,22 @@ func (oc *Collector) processReceivers(ctx context.Context, receivers []config.Ot slog.WarnContext(ctx, "Self-signed certificate for OTel receiver requested, "+ "this is not recommended for production environments.", + pluginLogOrigin, ) if receiver.OtlpTLSConfig.ExistingCert { slog.WarnContext(ctx, - "Certificate file already exists, skipping self-signed certificate generation", - ) + "Certificate file already exists, skipping self-signed certificate generation", pluginLogOrigin) } } else { - slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.") + slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.", + pluginLogOrigin) } } } func (oc *Collector) bootup(ctx context.Context) error { - slog.InfoContext(ctx, "Starting OTel collector") + slog.InfoContext(ctx, "Starting OTel collector", pluginLogOrigin) errChan := make(chan error) go func() { @@ -168,7 +175,7 @@ func (oc *Collector) bootup(ctx context.Context) error { if appErr != nil { errChan <- appErr } - slog.InfoContext(ctx, "OTel collector run finished") + slog.InfoContext(ctx, "OTel collector run finished", pluginLogOrigin) }() for { @@ -204,10 +211,10 @@ func (oc *Collector) Info() *bus.Info { // Close the plugin. func (oc *Collector) Close(ctx context.Context) error { - slog.InfoContext(ctx, "Closing OTel Collector plugin") + slog.InfoContext(ctx, "Closing OTel Collector plugin", pluginLogOrigin) if !oc.stopped { - slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState()) + slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState(), pluginLogOrigin) oc.service.Shutdown() oc.cancel() @@ -222,9 +229,12 @@ func (oc *Collector) Close(ctx context.Context) error { }) if err != nil { - slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState()) + slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", + "error", err, "state", oc.service.GetState(), + pluginLogOrigin, + ) } else { - slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState()) + slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState(), pluginLogOrigin) oc.stopped = true } } @@ -240,7 +250,7 @@ func (oc *Collector) Process(ctx context.Context, msg *bus.Message) { case bus.ResourceUpdateTopic: oc.handleResourceUpdate(ctx, msg) default: - slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic) + slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic, pluginLogOrigin) } } @@ -258,17 +268,19 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", + "payload", msg.Data, pluginLogOrigin) + return } reloadCollector := oc.checkForNewReceivers(nginxConfigContext) if reloadCollector { - slog.InfoContext(ctx, "Reloading OTel collector config") + slog.InfoContext(ctx, "Reloading OTel collector config", pluginLogOrigin) err := writeCollectorConfig(oc.config.Collector) if err != nil { - slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err) + slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err, pluginLogOrigin) return } @@ -282,7 +294,7 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message) resourceUpdateContext, ok := msg.Data.(*v1.Resource) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *v1.Resource", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to *v1.Resource", "payload", msg.Data, pluginLogOrigin) return } @@ -290,10 +302,10 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message) headersSetterExtensionUpdated := oc.updateHeadersSetterExtension(ctx, resourceUpdateContext) if resourceProcessorUpdated || headersSetterExtensionUpdated { - slog.InfoContext(ctx, "Reloading OTel collector config") + slog.InfoContext(ctx, "Reloading OTel collector config", pluginLogOrigin) err := writeCollectorConfig(oc.config.Collector) if err != nil { - slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err) + slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err, pluginLogOrigin) return } @@ -343,9 +355,9 @@ func (oc *Collector) updateHeadersSetterExtension( } if !isUUIDHeaderSet { - slog.DebugContext( - ctx, "Adding uuid header to OTel collector", + slog.DebugContext(ctx, "Adding uuid header to OTel collector", "uuid", resourceUpdateContext.GetResourceId(), + pluginLogOrigin, ) oc.config.Collector.Extensions.HeadersSetter.Headers = append( oc.config.Collector.Extensions.HeadersSetter.Headers, @@ -366,14 +378,14 @@ func (oc *Collector) updateHeadersSetterExtension( func (oc *Collector) restartCollector(ctx context.Context) { err := oc.Close(ctx) if err != nil { - slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err) + slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, pluginLogOrigin) return } settings := OTelCollectorSettings(oc.config) oTelCollector, err := otelcol.NewCollector(settings) if err != nil { - slog.ErrorContext(ctx, "Failed to create OTel Collector", "error", err) + slog.ErrorContext(ctx, "Failed to create OTel Collector", "error", err, pluginLogOrigin) return } oc.service = oTelCollector @@ -382,13 +394,14 @@ func (oc *Collector) restartCollector(ctx context.Context) { runCtx, oc.cancel = context.WithCancel(ctx) if !oc.stopped { - slog.ErrorContext(ctx, "Unable to restart OTel collector, failed to stop collector") + slog.ErrorContext(ctx, "Unable to restart OTel collector, failed to stop collector", pluginLogOrigin) + return } bootErr := oc.bootup(runCtx) if bootErr != nil { - slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr) + slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr, pluginLogOrigin) } } diff --git a/internal/collector/settings.go b/internal/collector/settings.go index 62e92afbb7..d148ce116c 100644 --- a/internal/collector/settings.go +++ b/internal/collector/settings.go @@ -26,6 +26,8 @@ const ( configFilePermission = 0o600 ) +var settingsLogOrigin = slog.String("log_origin", "settings.go") + //go:embed otelcol.tmpl var otelcolTemplate string @@ -115,7 +117,7 @@ func writeCollectorConfig(conf *config.Collector) error { defer func() { err = file.Close() if err != nil { - slog.Warn("Failed to close file", "file_path", confPath) + slog.Warn("Failed to close file", "file_path", confPath, settingsLogOrigin) } }() if err != nil { diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index a1b656fa57..bf7f100ce7 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -21,6 +21,8 @@ import ( "github.com/nginx/agent/v3/pkg/id" ) +var pluginLogOrigin = slog.String("log_origin", "command_plugin.go") + var _ bus.Plugin = (*CommandPlugin)(nil) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate @@ -57,7 +59,7 @@ func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnec } func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { - slog.DebugContext(ctx, "Starting command plugin") + slog.DebugContext(ctx, "Starting command plugin", pluginLogOrigin) cp.messagePipe = messagePipe cp.commandService = NewCommandService(cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel) @@ -68,7 +70,7 @@ func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeIn } func (cp *CommandPlugin) Close(ctx context.Context) error { - slog.InfoContext(ctx, "Closing command plugin") + slog.InfoContext(ctx, "Closing command plugin", pluginLogOrigin) cp.subscribeMutex.Lock() if cp.subscribeCancel != nil { @@ -98,7 +100,7 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) { case bus.DataPlaneResponseTopic: cp.processDataPlaneResponse(ctx, msg) default: - slog.DebugContext(ctx, "Command plugin unknown topic", "topic", msg.Topic) + slog.DebugContext(ctx, "Command plugin unknown topic", "topic", msg.Topic, pluginLogOrigin) } } @@ -109,7 +111,7 @@ func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Mes } else { statusErr := cp.commandService.UpdateDataPlaneStatus(ctx, resource) if statusErr != nil { - slog.ErrorContext(ctx, "Unable to update data plane status", "error", statusErr) + slog.ErrorContext(ctx, "Unable to update data plane status", "error", statusErr, pluginLogOrigin) } } } @@ -120,7 +122,7 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res createConnectionResponse, err := cp.commandService.CreateConnection(ctx, resource) if err != nil { - slog.ErrorContext(ctx, "Unable to create connection", "error", err) + slog.ErrorContext(ctx, "Unable to create connection", "error", err, pluginLogOrigin) } if createConnectionResponse != nil { @@ -142,7 +144,7 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me err := cp.commandService.UpdateDataPlaneHealth(ctx, instances) correlationID := logger.GetCorrelationID(ctx) if err != nil { - slog.ErrorContext(ctx, "Unable to update data plane health", "error", err) + slog.ErrorContext(ctx, "Unable to update data plane health", "error", err, pluginLogOrigin) cp.messagePipe.Process(ctx, &bus.Message{ Topic: bus.DataPlaneResponseTopic, Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, @@ -161,7 +163,7 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok { err := cp.commandService.UpdateDataPlaneHealth(ctx, instances) if err != nil { - slog.ErrorContext(ctx, "Unable to update data plane health", "error", err) + slog.ErrorContext(ctx, "Unable to update data plane health", "error", err, pluginLogOrigin) } } } @@ -170,25 +172,26 @@ func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus. if response, ok := msg.Data.(*mpi.DataPlaneResponse); ok { err := cp.commandService.SendDataPlaneResponse(ctx, response) if err != nil { - slog.ErrorContext(ctx, "Unable to send data plane response", "error", err) + slog.ErrorContext(ctx, "Unable to send data plane response", "error", err, pluginLogOrigin) } } } func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "Command plugin received connection reset") + slog.DebugContext(ctx, "Command plugin received connection reset", pluginLogOrigin) if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok { connectionErr := cp.conn.Close(ctx) if connectionErr != nil { - slog.ErrorContext(ctx, "Command plugin: unable to close connection", "error", connectionErr) + slog.ErrorContext(ctx, "Command plugin: unable to close connection", + "error", connectionErr, pluginLogOrigin) } cp.conn = newConnection err := cp.commandService.UpdateClient(ctx, cp.conn.CommandServiceClient()) if err != nil { - slog.ErrorContext(ctx, "Failed to reset connection", "error", err) + slog.ErrorContext(ctx, "Failed to reset connection", "error", err, pluginLogOrigin) return } - slog.DebugContext(ctx, "Command service client reset successfully") + slog.DebugContext(ctx, "Command service client reset successfully", pluginLogOrigin) } } @@ -213,7 +216,7 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) { logger.CorrelationIDContextKey, slog.Any(logger.CorrelationIDKey, message.GetMessageMeta().GetCorrelationId()), ) - slog.DebugContext(newCtx, "Received management plane request", "request", message) + slog.DebugContext(newCtx, "Received management plane request", "request", message, pluginLogOrigin) switch message.GetRequest().(type) { case *mpi.ManagementPlaneRequest_ConfigUploadRequest: @@ -225,7 +228,7 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) { case *mpi.ManagementPlaneRequest_ActionRequest: cp.handleAPIActionRequest(newCtx, message) default: - slog.DebugContext(newCtx, "Management plane request not implemented yet") + slog.DebugContext(newCtx, "Management plane request not implemented yet", pluginLogOrigin) } } } @@ -239,6 +242,7 @@ func (cp *CommandPlugin) handleAPIActionRequest(ctx context.Context, message *mp ctx, "API action feature disabled. Unable to process API action request", "request", message, "enabled_features", cp.config.Features, + pluginLogOrigin, ) err := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{ @@ -251,7 +255,7 @@ func (cp *CommandPlugin) handleAPIActionRequest(ctx context.Context, message *mp InstanceId: message.GetActionRequest().GetInstanceId(), }) if err != nil { - slog.ErrorContext(ctx, "Unable to send data plane response", "error", err) + slog.ErrorContext(ctx, "Unable to send data plane response", "error", err, pluginLogOrigin) } } } @@ -264,6 +268,7 @@ func (cp *CommandPlugin) handleConfigApplyRequest(newCtx context.Context, messag newCtx, "Configuration feature disabled. Unable to process config apply request", "request", message, "enabled_features", cp.config.Features, + pluginLogOrigin, ) err := cp.commandService.SendDataPlaneResponse(newCtx, &mpi.DataPlaneResponse{ @@ -276,7 +281,7 @@ func (cp *CommandPlugin) handleConfigApplyRequest(newCtx context.Context, messag InstanceId: message.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId(), }) if err != nil { - slog.ErrorContext(newCtx, "Unable to send data plane response", "error", err) + slog.ErrorContext(newCtx, "Unable to send data plane response", "error", err, pluginLogOrigin) } } } @@ -289,6 +294,7 @@ func (cp *CommandPlugin) handleConfigUploadRequest(newCtx context.Context, messa newCtx, "Configuration feature disabled. Unable to process config upload request", "request", message, "enabled_features", cp.config.Features, + pluginLogOrigin, ) err := cp.commandService.SendDataPlaneResponse(newCtx, &mpi.DataPlaneResponse{ @@ -301,7 +307,7 @@ func (cp *CommandPlugin) handleConfigUploadRequest(newCtx context.Context, messa InstanceId: message.GetConfigUploadRequest().GetOverview().GetConfigVersion().GetInstanceId(), }) if err != nil { - slog.ErrorContext(newCtx, "Unable to send data plane response", "error", err) + slog.ErrorContext(newCtx, "Unable to send data plane response", "error", err, pluginLogOrigin) } } } diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 2aa80284c8..ded8ea7e09 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -31,6 +31,8 @@ import ( var _ commandService = (*CommandService)(nil) +var serviceLogOrigin = slog.String("log_origin", "command_service.go") + const ( createConnectionMaxElapsedTime = 0 ) @@ -96,8 +98,11 @@ func (cs *CommandService) UpdateDataPlaneStatus( defer backoffCancel() sendDataPlaneStatus := func() (*mpi.UpdateDataPlaneStatusResponse, error) { - slog.DebugContext(ctx, "Sending data plane status update request", "request", request, - "parent_correlation_id", correlationID) + slog.DebugContext(ctx, "Sending data plane status update request", + "request", request, + "parent_correlation_id", correlationID, + serviceLogOrigin, + ) cs.subscribeClientMutex.Lock() if cs.commandServiceClient == nil { @@ -109,8 +114,7 @@ func (cs *CommandService) UpdateDataPlaneStatus( validatedError := grpc.ValidateGrpcError(updateError) if validatedError != nil { - slog.ErrorContext(ctx, "Failed to send update data plane status", "error", validatedError) - + slog.ErrorContext(ctx, "Failed to send update data plane status", "error", validatedError, serviceLogOrigin) return nil, validatedError } @@ -124,7 +128,7 @@ func (cs *CommandService) UpdateDataPlaneStatus( if err != nil { return err } - slog.DebugContext(ctx, "UpdateDataPlaneStatus response", "response", response) + slog.DebugContext(ctx, "UpdateDataPlaneStatus response", "response", response, serviceLogOrigin) cs.resourceMutex.Lock() defer cs.resourceMutex.Unlock() @@ -160,13 +164,13 @@ func (cs *CommandService) UpdateDataPlaneHealth(ctx context.Context, instanceHea return err } - slog.DebugContext(ctx, "UpdateDataPlaneHealth response", "response", response) + slog.DebugContext(ctx, "UpdateDataPlaneHealth response", "response", response, serviceLogOrigin) return err } func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error { - slog.DebugContext(ctx, "Sending data plane response", "response", response) + slog.DebugContext(ctx, "Sending data plane response", "response", response, serviceLogOrigin) backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() @@ -198,7 +202,8 @@ func (cs *CommandService) Subscribe(ctx context.Context) { default: retryError := backoff.Retry(cs.receiveCallback(ctx), backoffHelpers.Context(ctx, commonSettings)) if retryError != nil { - slog.WarnContext(ctx, "Failed to receive messages from subscribe stream", "error", retryError) + slog.WarnContext(ctx, "Failed to receive messages from subscribe stream", + "error", retryError, serviceLogOrigin) } } } @@ -210,7 +215,7 @@ func (cs *CommandService) CreateConnection( ) (*mpi.CreateConnectionResponse, error) { correlationID := logger.GetCorrelationID(ctx) if len(resource.GetInstances()) <= 1 { - slog.InfoContext(ctx, "No Data Plane Instance found") + slog.InfoContext(ctx, "No Data Plane Instance found", serviceLogOrigin) } if cs.isConnected.Load() { @@ -234,7 +239,7 @@ func (cs *CommandService) CreateConnection( Multiplier: cs.agentConfig.Client.Backoff.Multiplier, } - slog.DebugContext(ctx, "Sending create connection request", "request", request) + slog.DebugContext(ctx, "Sending create connection request", "request", request, serviceLogOrigin) response, err := backoff.RetryWithData( cs.connectCallback(ctx, request), backoffHelpers.Context(ctx, commonSettings), @@ -243,8 +248,8 @@ func (cs *CommandService) CreateConnection( return nil, err } - slog.InfoContext(ctx, "Connection created", "response", response) - slog.InfoContext(ctx, "Agent connected") + slog.InfoContext(ctx, "Connection created", "response", response, serviceLogOrigin) + slog.InfoContext(ctx, "Agent connected", serviceLogOrigin) cs.isConnected.Store(true) @@ -265,7 +270,7 @@ func (cs *CommandService) UpdateClient(ctx context.Context, client mpi.CommandSe if err != nil { return err } - slog.InfoContext(ctx, "Successfully sent create connection request", "response", resp) + slog.InfoContext(ctx, "Successfully sent create connection request", "response", resp, serviceLogOrigin) return nil } @@ -285,8 +290,7 @@ func (cs *CommandService) sendDataPlaneResponseCallback( err := cs.subscribeClient.Send(response) if err != nil { - slog.ErrorContext(ctx, "Failed to send data plane response", "error", err) - + slog.ErrorContext(ctx, "Failed to send data plane response", "error", err, serviceLogOrigin) return err } @@ -341,6 +345,7 @@ func (cs *CommandService) sendResponseForQueuedConfigApplyRequests( ctx, "Sending data plane response for queued config apply request", "response", newResponse, + serviceLogOrigin, ) backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) @@ -350,7 +355,7 @@ func (cs *CommandService) sendResponseForQueuedConfigApplyRequests( backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), ) if err != nil { - slog.ErrorContext(ctx, "Failed to send data plane response", "error", err) + slog.ErrorContext(ctx, "Failed to send data plane response", "error", err, serviceLogOrigin) backoffCancel() return err @@ -360,7 +365,10 @@ func (cs *CommandService) sendResponseForQueuedConfigApplyRequests( } cs.configApplyRequestQueue[instanceID] = cs.configApplyRequestQueue[instanceID][indexOfConfigApplyRequest+1:] - slog.DebugContext(ctx, "Removed config apply requests from queue", "queue", cs.configApplyRequestQueue[instanceID]) + slog.DebugContext(ctx, "Removed config apply requests from queue", + "queue", cs.configApplyRequestQueue[instanceID], + serviceLogOrigin, + ) if len(cs.configApplyRequestQueue[instanceID]) > 0 { cs.subscribeChannel <- cs.configApplyRequestQueue[instanceID][len(cs.configApplyRequestQueue[instanceID])-1] @@ -375,7 +383,7 @@ func (cs *CommandService) dataPlaneHealthCallback( request *mpi.UpdateDataPlaneHealthRequest, ) func() (*mpi.UpdateDataPlaneHealthResponse, error) { return func() (*mpi.UpdateDataPlaneHealthResponse, error) { - slog.DebugContext(ctx, "Sending data plane health update request", "request", request) + slog.DebugContext(ctx, "Sending data plane health update request", "request", request, serviceLogOrigin) cs.subscribeClientMutex.Lock() if cs.commandServiceClient == nil { @@ -389,7 +397,7 @@ func (cs *CommandService) dataPlaneHealthCallback( validatedError := grpc.ValidateGrpcError(updateError) if validatedError != nil { - slog.ErrorContext(ctx, "Failed to send update data plane health", "error", validatedError) + slog.ErrorContext(ctx, "Failed to send update data plane health", "error", validatedError, serviceLogOrigin) return nil, validatedError } @@ -453,17 +461,21 @@ func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, e if ok && codeError.Code() == codes.Unavailable { cs.isConnected.Store(false) - slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s, rpc unavailable. "+ - "Trying create connection rpc", errorMsg), "error", err) + slog.ErrorContext( + ctx, + fmt.Sprintf("Failed to %s, rpc unavailable. Trying create connection rpc", errorMsg), + "error", err, + serviceLogOrigin, + ) _, connectionErr := cs.CreateConnection(ctx, cs.resource) if connectionErr != nil { - slog.ErrorContext(ctx, "Unable to create connection", "error", err) + slog.ErrorContext(ctx, "Unable to create connection", "error", err, serviceLogOrigin) } return nil } - slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s", errorMsg), "error", err) + slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s", errorMsg), "error", err, serviceLogOrigin) return err } @@ -481,6 +493,7 @@ func (cs *CommandService) queueConfigApplyRequests(ctx context.Context, request ctx, "Config apply request is already in progress, queuing new config apply request", "request", request, + serviceLogOrigin, ) } } @@ -526,6 +539,7 @@ func (cs *CommandService) checkIfInstanceExists( "Unable to handle request, instance not found", "instance", requestInstanceID, "request", request, + serviceLogOrigin, ) response := &mpi.DataPlaneResponse{ @@ -543,7 +557,7 @@ func (cs *CommandService) checkIfInstanceExists( } err := cs.SendDataPlaneResponse(ctx, response) if err != nil { - slog.ErrorContext(ctx, "Failed to send data plane response", "error", err) + slog.ErrorContext(ctx, "Failed to send data plane response", "error", err, serviceLogOrigin) } } @@ -562,7 +576,7 @@ func (cs *CommandService) connectCallback( validatedError := grpc.ValidateGrpcError(connectErr) if validatedError != nil { - slog.ErrorContext(ctx, "Failed to create connection", "error", validatedError) + slog.ErrorContext(ctx, "Failed to create connection", "error", validatedError, serviceLogOrigin) return nil, validatedError } diff --git a/internal/config/command.go b/internal/config/command.go index 03b9b9193c..8f2612f442 100644 --- a/internal/config/command.go +++ b/internal/config/command.go @@ -12,6 +12,8 @@ import ( "github.com/spf13/cobra" ) +var commandLogOrigin = slog.String("log_origin", "command.go") + var RootCommand = &cobra.Command{ Use: "nginx-agent [flags]", Short: "nginx-agent", @@ -67,7 +69,7 @@ $ nginx-agent completion fish > ~/.config/fish/completions/nginx-agent.fish } if err != nil { - slog.Warn("Error sending command", "error", err) + slog.Warn("Error sending command", "error", err, commandLogOrigin) } }, } diff --git a/internal/config/config.go b/internal/config/config.go index 8f9a41a4db..ede16e38f8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -39,6 +39,8 @@ const ( AgentDirName = "/etc/nginx-agent/" ) +var configLogOrigin = slog.String("log_origin", "config.go") + var viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) func RegisterRunner(r func(cmd *cobra.Command, args []string)) { @@ -65,7 +67,7 @@ func RegisterConfigFile() error { return err } - slog.Debug("Configuration file loaded", "config_path", configPath) + slog.Debug("Configuration file loaded", "config_path", configPath, configLogOrigin) viperInstance.Set(ConfigPathKey, configPath) exePath, err := os.Executable() @@ -90,7 +92,7 @@ func ResolveConfig() (*Config, error) { // Check directories in allowed_directories are valid for _, dir := range directories { if dir == "" || !filepath.IsAbs(dir) { - slog.Warn("Invalid directory: ", "dir", dir) + slog.Warn("Invalid directory: ", "dir", dir, configLogOrigin) continue } @@ -100,7 +102,7 @@ func ResolveConfig() (*Config, error) { allowedDirs = append(allowedDirs, dir) } - slog.Info("Configured allowed directories", "allowed_directories", allowedDirs) + slog.Info("Configured allowed directories", "allowed_directories", allowedDirs, configLogOrigin) // Collect all parsing errors before returning the error, so the user sees all issues with config // in one error message. @@ -128,9 +130,11 @@ func ResolveConfig() (*Config, error) { checkCollectorConfiguration(collector, config) - slog.Debug("Agent config", "config", config) + slog.Debug("Agent config", "config", config, configLogOrigin) slog.Info("Excluded files from being watched for file changes", "exclude_files", - config.Watchers.FileWatcher.ExcludeFiles) + config.Watchers.FileWatcher.ExcludeFiles, + configLogOrigin, + ) return config, nil } @@ -138,8 +142,8 @@ func ResolveConfig() (*Config, error) { func checkCollectorConfiguration(collector *Collector, config *Config) { if isOTelExporterConfigured(collector) && config.IsGrpcClientConfigured() && config.IsAuthConfigured() && config.IsTLSConfigured() { - slog.Info("No collector configuration found in NGINX Agent config, command server configuration found." + - "Using default collector configuration") + slog.Info("No collector configuration found in NGINX Agent config, command server configuration found."+ + "Using default collector configuration", configLogOrigin) defaultCollector(collector, config) } } @@ -147,11 +151,14 @@ func checkCollectorConfiguration(collector *Collector, config *Config) { func defaultCollector(collector *Collector, config *Config) { token := config.Command.Auth.Token if config.Command.Auth.TokenPath != "" { - slog.Debug("Reading token from file", "path", config.Command.Auth.TokenPath) + slog.Debug("Reading token from file", "path", config.Command.Auth.TokenPath, configLogOrigin) pathToken, err := file.ReadFromFile(config.Command.Auth.TokenPath) if err != nil { slog.Error("Error adding token to default collector, "+ - "default collector configuration not started", "error", err) + "default collector configuration not started", + "error", err, + configLogOrigin, + ) return } @@ -293,7 +300,7 @@ func registerFlags() { } err := viperInstance.BindEnv(flag.Name) if err != nil { - slog.Warn("Error occurred binding env", "env", flag.Name, "error", err) + slog.Warn("Error occurred binding env", "env", flag.Name, "error", err, configLogOrigin) } }) } @@ -536,7 +543,7 @@ func getConfigFilePaths() []string { if err == nil { paths = append(paths, path) } else { - slog.Warn("Unable to determine process's current directory", "error", err) + slog.Warn("Unable to determine process's current directory", "error", err, configLogOrigin) } return paths @@ -627,7 +634,7 @@ func resolveLabels() map[string]interface{} { } } - slog.Info("Configured labels", "labels", result) + slog.Info("Configured labels", "labels", result, configLogOrigin) return result } @@ -643,7 +650,7 @@ func resolveEnvironmentVariableLabels() map[string]string { if len(splitLabel) == KeyValueNumber { envLabels[splitLabel[0]] = splitLabel[1] } else { - slog.Warn("Unable to parse label ", "label", label) + slog.Warn("Unable to parse label ", "label", label, configLogOrigin) } } } @@ -925,6 +932,7 @@ func resolveCommand() *Command { slog.Error( "Invalid value for command server type, defaulting to gRPC server type", "server_type", viperInstance.GetString(CommandServerTypeKey), + configLogOrigin, ) } diff --git a/internal/config/mapper.go b/internal/config/mapper.go index beda9352af..716be6e4e7 100644 --- a/internal/config/mapper.go +++ b/internal/config/mapper.go @@ -11,6 +11,8 @@ import ( mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" ) +var mapperLogOrigin = slog.String("log_origin", "mapper.go") + // FromCommandProto maps the AgentConfig Command struct to the Command proto message func FromCommandProto(config *mpi.CommandServer) *Command { cmd := &Command{} @@ -40,7 +42,8 @@ func FromCommandProto(config *mpi.CommandServer) *Command { SkipVerify: config.GetTls().GetSkipVerify(), } if cmd.TLS.SkipVerify { - slog.Warn("Insecure setting SkipVerify, this tells the server to accept a certificate with any hostname.") + slog.Warn("Insecure setting SkipVerify, this tells the server to accept a certificate with any hostname.", + mapperLogOrigin) } } else { cmd.TLS = nil diff --git a/internal/datasource/host/exec/exec.go b/internal/datasource/host/exec/exec.go index 8a160e2635..b8588350c1 100644 --- a/internal/datasource/host/exec/exec.go +++ b/internal/datasource/host/exec/exec.go @@ -17,6 +17,8 @@ import ( "github.com/shirou/gopsutil/v4/host" ) +var logOrigin = slog.String("log_origin", "exec.go") + //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate //counterfeiter:generate . ExecInterface type ExecInterface interface { @@ -70,7 +72,7 @@ func (*Exec) HostID(ctx context.Context) (string, error) { func (*Exec) ReleaseInfo(ctx context.Context) (releaseInfo *v1.ReleaseInfo) { hostInfo, err := host.InfoWithContext(ctx) if err != nil { - slog.ErrorContext(ctx, "Could not read release information for host", "error", err) + slog.ErrorContext(ctx, "Could not read release information for host", "error", err, logOrigin) return &v1.ReleaseInfo{} } diff --git a/internal/datasource/host/info.go b/internal/datasource/host/info.go index efa01b003f..a334b42e95 100644 --- a/internal/datasource/host/info.go +++ b/internal/datasource/host/info.go @@ -49,6 +49,8 @@ const ( GetSystemUUIDKey = "GetSystemUUIDKey" ) +var logOrigin = slog.String("log_origin", "info.go") + var ( singleflightGroup = &singleflight.Group{} @@ -115,7 +117,7 @@ func (i *Info) IsContainer() bool { }) if err != nil { - slog.Warn("Unable to determine if resource is a container or not", "error", err) + slog.Warn("Unable to determine if resource is a container or not", "error", err, logOrigin) return false } @@ -137,7 +139,7 @@ func (i *Info) ResourceID(ctx context.Context) string { func (i *Info) ContainerInfo(ctx context.Context) *v1.Resource_ContainerInfo { hostname, err := i.exec.Hostname() if err != nil { - slog.WarnContext(ctx, "Unable to get hostname", "error", err) + slog.WarnContext(ctx, "Unable to get hostname", "error", err, logOrigin) } return &v1.Resource_ContainerInfo{ @@ -152,7 +154,7 @@ func (i *Info) ContainerInfo(ctx context.Context) *v1.Resource_ContainerInfo { func (i *Info) HostInfo(ctx context.Context) *v1.Resource_HostInfo { hostname, err := i.exec.Hostname() if err != nil { - slog.WarnContext(ctx, "Unable to get hostname", "error", err) + slog.WarnContext(ctx, "Unable to get hostname", "error", err, logOrigin) } return &v1.Resource_HostInfo{ @@ -167,7 +169,7 @@ func (i *Info) HostInfo(ctx context.Context) *v1.Resource_HostInfo { func containsContainerReference(cgroupFile string) bool { data, err := os.ReadFile(cgroupFile) if err != nil { - slog.Warn("Unable to check if cgroup file contains a container reference", "error", err) + slog.Warn("Unable to check if cgroup file contains a container reference", "error", err, logOrigin) return false } @@ -189,7 +191,7 @@ func (i *Info) getContainerID() string { }) if err != nil { - slog.Error("Could not get container ID", "error", err) + slog.Error("Could not get container ID", "error", err, logOrigin) return "" } @@ -211,7 +213,7 @@ func getContainerIDFromMountInfo(mountInfo string) (string, error) { defer func(f *os.File, fileName string) { closeErr := f.Close() if closeErr != nil { - slog.Error("Unable to close file", "file", fileName, "error", closeErr) + slog.Error("Unable to close file", "file", fileName, "error", closeErr, logOrigin) } }(mInfoFile, mountInfo) @@ -275,7 +277,7 @@ func (i *Info) getHostID(ctx context.Context) string { hostID, err := i.exec.HostID(ctx) if err != nil { - slog.WarnContext(ctx, "Unable to get host ID", "error", err) + slog.WarnContext(ctx, "Unable to get host ID", "error", err, logOrigin) return "", err } @@ -283,7 +285,7 @@ func (i *Info) getHostID(ctx context.Context) string { }) if err != nil { - slog.WarnContext(ctx, "Unable to get host ID", "error", err) + slog.WarnContext(ctx, "Unable to get host ID", "error", err, logOrigin) return "" } @@ -298,7 +300,7 @@ func (i *Info) getReleaseInfo(ctx context.Context, osReleaseLocation string) (re hostReleaseInfo := i.exec.ReleaseInfo(ctx) osRelease, err := readOsRelease(osReleaseLocation) if err != nil { - slog.WarnContext(ctx, "Unable to read from os release file", "error", err) + slog.WarnContext(ctx, "Unable to read from os release file", "error", err, logOrigin) return hostReleaseInfo } @@ -314,7 +316,7 @@ func readOsRelease(path string) (map[string]string, error) { defer func(f *os.File, fileName string) { closeErr := f.Close() if closeErr != nil { - slog.Error("Unable to close file", "file", fileName, "error", closeErr) + slog.Error("Unable to close file", "file", fileName, "error", closeErr, logOrigin) } }(f, path) diff --git a/internal/datasource/nginx/log_tailer.go b/internal/datasource/nginx/log_tailer.go index 1b7d73bbf5..6381e5fc37 100644 --- a/internal/datasource/nginx/log_tailer.go +++ b/internal/datasource/nginx/log_tailer.go @@ -18,6 +18,8 @@ import ( const numberOfTimesToSplitLogLine = 2 +var logOrigin = slog.String("log_origin", "log_tailer.go") + var tailConfig = tail.Config{ Follow: true, ReOpen: true, @@ -215,9 +217,9 @@ func handleContextDone(ctx context.Context) { ctxErr := ctx.Err() switch ctxErr { case context.DeadlineExceeded: - slog.DebugContext(ctx, "Tailer canceled because deadline was exceeded", "error", ctxErr) + slog.DebugContext(ctx, "Tailer canceled because deadline was exceeded", "error", ctxErr, logOrigin) case context.Canceled: - slog.DebugContext(ctx, "Tailer forcibly canceled", "error", ctxErr) + slog.DebugContext(ctx, "Tailer forcibly canceled", "error", ctxErr, logOrigin) } - slog.DebugContext(ctx, "Tailer is done") + slog.DebugContext(ctx, "Tailer is done", logOrigin) } diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index 7774bad75b..f7576382a0 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -44,6 +44,8 @@ const ( filePerm = 0o600 ) +var managerLogOrigin = slog.String("log_origin", "file_manager_service.go") + var ( manifestDirPath = "/var/lib/nginx-agent" manifestFilePath = manifestDirPath + "/manifest.json" @@ -143,9 +145,11 @@ func (fms *FileManagerService) UpdateOverview( slog.InfoContext(newCtx, "Updating file overview", "instance_id", request.GetOverview().GetConfigVersion().GetInstanceId(), "parent_correlation_id", correlationID, + managerLogOrigin, ) slog.DebugContext(newCtx, "Sending update overview request", "request", request, "parent_correlation_id", correlationID, + managerLogOrigin, ) response, updateError := fms.fileServiceClient.UpdateOverview(newCtx, request) @@ -153,7 +157,7 @@ func (fms *FileManagerService) UpdateOverview( validatedError := grpc.ValidateGrpcError(updateError) if validatedError != nil { - slog.ErrorContext(newCtx, "Failed to send update overview", "error", validatedError) + slog.ErrorContext(newCtx, "Failed to send update overview", "error", validatedError, managerLogOrigin) return nil, validatedError } @@ -169,10 +173,10 @@ func (fms *FileManagerService) UpdateOverview( return err } - slog.DebugContext(newCtx, "UpdateOverview response", "response", response) + slog.DebugContext(newCtx, "UpdateOverview response", "response", response, managerLogOrigin) if response.GetOverview() == nil { - slog.Debug("UpdateOverview response is empty") + slog.Debug("UpdateOverview response is empty", managerLogOrigin) return nil } delta := files.ConvertToMapOfFiles(response.GetOverview().GetFiles()) @@ -215,7 +219,7 @@ func (fms *FileManagerService) updateFiles( } iteration++ - slog.Debug("Updating file overview", "attempt_number", iteration) + slog.Debug("Updating file overview", "attempt_number", iteration, managerLogOrigin) return fms.UpdateOverview(ctx, instanceID, diffFiles, iteration) } @@ -225,7 +229,11 @@ func (fms *FileManagerService) UpdateFile( instanceID string, fileToUpdate *mpi.File, ) error { - slog.InfoContext(ctx, "Updating file", "instance_id", instanceID, "file_name", fileToUpdate.GetFileMeta().GetName()) + slog.InfoContext(ctx, "Updating file", + "instance_id", instanceID, + "file_name", fileToUpdate.GetFileMeta().GetName(), + managerLogOrigin, + ) contents, err := os.ReadFile(fileToUpdate.GetFileMeta().GetName()) if err != nil { return err @@ -249,8 +257,11 @@ func (fms *FileManagerService) UpdateFile( defer backoffCancel() sendUpdateFile := func() (*mpi.UpdateFileResponse, error) { - slog.DebugContext(ctx, "Sending update file request", "request_file", request.GetFile(), - "request_message_meta", request.GetMessageMeta()) + slog.DebugContext(ctx, "Sending update file request", + "request_file", request.GetFile(), + "request_message_meta", request.GetMessageMeta(), + managerLogOrigin, + ) if fms.fileServiceClient == nil { return nil, errors.New("file service client is not initialized") } @@ -264,7 +275,7 @@ func (fms *FileManagerService) UpdateFile( validatedError := grpc.ValidateGrpcError(updateError) if validatedError != nil { - slog.ErrorContext(ctx, "Failed to send update file", "error", validatedError) + slog.ErrorContext(ctx, "Failed to send update file", "error", validatedError, managerLogOrigin) return nil, validatedError } @@ -278,7 +289,7 @@ func (fms *FileManagerService) UpdateFile( return err } - slog.DebugContext(ctx, "UpdateFile response", "response", response) + slog.DebugContext(ctx, "UpdateFile response", "response", response, managerLogOrigin) return err } @@ -341,7 +352,7 @@ func (fms *FileManagerService) ClearCache() { // nolint:revive,cyclop func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) error { - slog.InfoContext(ctx, "Rolling back config for instance", "instanceid", instanceID) + slog.InfoContext(ctx, "Rolling back config for instance", "instanceid", instanceID, managerLogOrigin) areFilesUpdated := false fms.filesMutex.Lock() defer fms.filesMutex.Unlock() @@ -371,7 +382,7 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) case model.Unchanged: fallthrough default: - slog.DebugContext(ctx, "File Action not implemented") + slog.DebugContext(ctx, "File Action not implemented", managerLogOrigin) } } @@ -401,7 +412,7 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) error { return updateErr } case model.Unchanged: - slog.DebugContext(ctx, "File unchanged") + slog.DebugContext(ctx, "File unchanged", managerLogOrigin) } } diff --git a/internal/file/file_operator.go b/internal/file/file_operator.go index 06c174b900..743e81b010 100644 --- a/internal/file/file_operator.go +++ b/internal/file/file_operator.go @@ -19,6 +19,8 @@ import ( type FileOperator struct{} +var operatorLogOrigin = slog.String("log_origin", "file_operator.go") + var _ fileOperator = (*FileOperator)(nil) // FileOperator only purpose is to write files, @@ -30,7 +32,7 @@ func NewFileOperator() *FileOperator { func (fo *FileOperator) Write(ctx context.Context, fileContent []byte, file *mpi.FileMeta) error { filePermission := files.FileMode(file.GetPermissions()) if _, err := os.Stat(file.GetName()); os.IsNotExist(err) { - slog.DebugContext(ctx, "File does not exist, creating new file", "file_path", file.GetName()) + slog.DebugContext(ctx, "File does not exist, creating new file", "file_path", file.GetName(), operatorLogOrigin) err = os.MkdirAll(path.Dir(file.GetName()), filePermission) if err != nil { return fmt.Errorf("error creating directory %s: %w", path.Dir(file.GetName()), err) @@ -41,7 +43,7 @@ func (fo *FileOperator) Write(ctx context.Context, fileContent []byte, file *mpi if err != nil { return fmt.Errorf("error writing to file %s: %w", file.GetName(), err) } - slog.DebugContext(ctx, "Content written to file", "file_path", file.GetName()) + slog.DebugContext(ctx, "Content written to file", "file_path", file.GetName(), operatorLogOrigin) return nil } diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index 88086473fb..118a951a18 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -22,6 +22,8 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +var pluginLogOrigin = slog.String("log_origin", "file_plugin.go") + var _ bus.Plugin = (*FilePlugin)(nil) // The file plugin only writes, deletes and checks hashes of files @@ -42,7 +44,7 @@ func NewFilePlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectio } func (fp *FilePlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { - slog.DebugContext(ctx, "Starting file plugin") + slog.DebugContext(ctx, "Starting file plugin", pluginLogOrigin) fp.messagePipe = messagePipe fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config) @@ -51,7 +53,7 @@ func (fp *FilePlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInter } func (fp *FilePlugin) Close(ctx context.Context) error { - slog.InfoContext(ctx, "Closing file plugin") + slog.InfoContext(ctx, "Closing file plugin", pluginLogOrigin) return fp.conn.Close(ctx) } @@ -80,7 +82,7 @@ func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) { case bus.ConfigApplyFailedTopic: fp.handleConfigApplyFailedRequest(ctx, msg) default: - slog.DebugContext(ctx, "File plugin unknown topic", "topic", msg.Topic) + slog.DebugContext(ctx, "File plugin unknown topic", "topic", msg.Topic, pluginLogOrigin) } } @@ -98,12 +100,12 @@ func (fp *FilePlugin) Subscriptions() []string { } func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received connection reset message") + slog.DebugContext(ctx, "File plugin received connection reset message", pluginLogOrigin) if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok { var reconnect bool err := fp.conn.Close(ctx) if err != nil { - slog.ErrorContext(ctx, "File plugin: unable to close connection", "error", err) + slog.ErrorContext(ctx, "File plugin: unable to close connection", "error", err, pluginLogOrigin) } fp.conn = newConnection @@ -111,7 +113,7 @@ func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Messag fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config) fp.fileManagerService.SetIsConnected(reconnect) - slog.DebugContext(ctx, "File plugin: client reset successfully") + slog.DebugContext(ctx, "File plugin: client reset successfully", pluginLogOrigin) } } @@ -119,7 +121,11 @@ func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Me response, ok := msg.Data.(*mpi.DataPlaneResponse) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", + "payload", msg.Data, + pluginLogOrigin, + ) + return } @@ -143,7 +149,9 @@ func (fp *FilePlugin) handleConfigApplyFailedRequest(ctx context.Context, msg *b data, ok := msg.Data.(*model.ConfigApplyMessage) if data.InstanceID == "" || !ok { slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", - "payload", msg.Data) + "payload", msg.Data, + pluginLogOrigin, + ) fp.fileManagerService.ClearCache() return @@ -177,7 +185,9 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) if !ok { slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", - "payload", msg.Data) + "payload", msg.Data, + pluginLogOrigin, + ) return } @@ -185,7 +195,9 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes request, requestOk := managementPlaneRequest.GetRequest().(*mpi.ManagementPlaneRequest_ConfigApplyRequest) if !requestOk { slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest_ConfigApplyRequest", - "payload", msg.Data) + "payload", msg.Data, + pluginLogOrigin, + ) return } @@ -220,6 +232,7 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes "Failed to apply config changes", "instance_id", instanceID, "error", err, + pluginLogOrigin, ) response = fp.createDataPlaneResponse( correlationID, @@ -239,6 +252,7 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes "Failed to apply config changes, rolling back", "instance_id", instanceID, "error", err, + pluginLogOrigin, ) response = fp.createDataPlaneResponse( @@ -293,7 +307,12 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) { nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data) + slog.ErrorContext( + ctx, + "Unable to cast message payload to *model.NginxConfigContext", + "payload", msg.Data, + pluginLogOrigin, + ) return } @@ -307,6 +326,7 @@ func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Mess "Failed to update file overview", "instance_id", nginxConfigContext.InstanceID, "error", err, + pluginLogOrigin, ) } } @@ -318,6 +338,7 @@ func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Me ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", "payload", msg.Data, + pluginLogOrigin, ) return @@ -342,6 +363,7 @@ func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Me "instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), "file_name", file.GetFileMeta().GetName(), "error", err, + pluginLogOrigin, ) response := fp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_ERROR, diff --git a/test/mock/grpc/cmd/main.go b/test/mock/grpc/cmd/main.go index 1b0fcc7615..ac03c93d46 100644 --- a/test/mock/grpc/cmd/main.go +++ b/test/mock/grpc/cmd/main.go @@ -26,6 +26,8 @@ const ( directoryPermissions = 0o700 ) +var logOrigin = slog.String("log_origin", "main.go") + var ( sleepDuration = flag.Duration("sleepDuration", defaultSleepDuration, "duration between changes in health") configDirectory = flag.String("configDirectory", "", "set the directory where the config files are stored") @@ -41,12 +43,12 @@ func main() { agentConfig := types.AgentConfig() grpcHost, grpcPort, err := net.SplitHostPort(*grpcAddress) if err != nil { - slog.ErrorContext(ctx, "Failed to read host and port", "error", err) + slog.ErrorContext(ctx, "Failed to read host and port", "error", err, logOrigin) os.Exit(1) } portInt, err := strconv.Atoi(grpcPort) if err != nil { - slog.ErrorContext(ctx, "Failed to convert port", "error", err) + slog.ErrorContext(ctx, "Failed to convert port", "error", err, logOrigin) os.Exit(1) } @@ -67,33 +69,33 @@ func main() { defaultConfigDirectory, configDirErr := generateDefaultConfigDirectory() configDirectory = &defaultConfigDirectory if configDirErr != nil { - slog.ErrorContext(ctx, "Failed to create default config directory", "error", err) + slog.ErrorContext(ctx, "Failed to create default config directory", "error", err, logOrigin) os.Exit(1) } } - slog.DebugContext(ctx, "Config directory", "directory", *configDirectory) + slog.DebugContext(ctx, "Config directory", "directory", *configDirectory, logOrigin) _, err = grpc.NewMockManagementServer(*apiAddress, agentConfig, configDirectory) if err != nil { - slog.ErrorContext(ctx, "Failed to start mock management server", "error", err) + slog.ErrorContext(ctx, "Failed to start mock management server", "error", err, logOrigin) os.Exit(1) } <-ctx.Done() } func generateDefaultConfigDirectory() (string, error) { - slog.Info("Generating default configs") + slog.Info("Generating default configs", logOrigin) tempDirectory := os.TempDir() configDirectory := filepath.Join(tempDirectory, "config") err := os.MkdirAll(configDirectory, directoryPermissions) if err != nil { - slog.Error("Failed to create directories", "error", err) + slog.Error("Failed to create directories", "error", err, logOrigin) return "", err } - slog.Info("Created default config directory", "directory", configDirectory) + slog.Info("Created default config directory", "directory", configDirectory, logOrigin) return configDirectory, nil } diff --git a/test/mock/grpc/mock_management_command_service.go b/test/mock/grpc/mock_management_command_service.go index 3cae35f435..e4ba242750 100644 --- a/test/mock/grpc/mock_management_command_service.go +++ b/test/mock/grpc/mock_management_command_service.go @@ -30,6 +30,8 @@ import ( sloggin "github.com/samber/slog-gin" ) +var commandLogOrigin = slog.String("log_origin", "mock_management_command_service.go") + type CommandService struct { mpi.UnimplementedCommandServiceServer server *gin.Engine @@ -76,10 +78,10 @@ func NewCommandService(requestChan chan *mpi.ManagementPlaneRequest, configDirec } func (cs *CommandService) StartServer(listener net.Listener) { - slog.Info("Starting mock management plane http server", "address", listener.Addr().String()) + slog.Info("Starting mock management plane http server", "address", listener.Addr().String(), commandLogOrigin) err := cs.server.RunListener(listener) if err != nil { - slog.Error("Failed to start mock management plane http server", "error", err) + slog.Error("Failed to start mock management plane http server", "error", err, commandLogOrigin) } } @@ -89,7 +91,7 @@ func (cs *CommandService) CreateConnection( *mpi.CreateConnectionResponse, error, ) { - slog.DebugContext(ctx, "Create connection request", "request", request) + slog.DebugContext(ctx, "Create connection request", "request", request, commandLogOrigin) if request == nil { return nil, errors.New("empty connection request") @@ -114,7 +116,7 @@ func (cs *CommandService) UpdateDataPlaneStatus( *mpi.UpdateDataPlaneStatusResponse, error, ) { - slog.Debug("Update data plane status request", "request", request) + slog.Debug("Update data plane status request", "request", request, commandLogOrigin) if request == nil { return nil, errors.New("empty update data plane status request") @@ -133,7 +135,7 @@ func (cs *CommandService) UpdateDataPlaneHealth( *mpi.UpdateDataPlaneHealthResponse, error, ) { - slog.Debug("Update data plane health request", "request", request) + slog.Debug("Update data plane health request", "request", request, commandLogOrigin) if request == nil { return nil, errors.New("empty update dataplane health request") @@ -151,7 +153,7 @@ func (cs *CommandService) Subscribe(in mpi.CommandService_SubscribeServer) error go cs.listenForDataPlaneResponses(ctx, in) - slog.InfoContext(ctx, "Starting Subscribe") + slog.InfoContext(ctx, "Starting Subscribe", commandLogOrigin) for { select { @@ -160,7 +162,7 @@ func (cs *CommandService) Subscribe(in mpi.CommandService_SubscribeServer) error default: request := <-cs.requestChan - slog.InfoContext(ctx, "Subscribe", "request", request) + slog.InfoContext(ctx, "Subscribe", "request", request, commandLogOrigin) if upload, ok := request.GetRequest().(*mpi.ManagementPlaneRequest_ConfigUploadRequest); ok { cs.handleConfigUploadRequest(ctx, upload) @@ -168,7 +170,7 @@ func (cs *CommandService) Subscribe(in mpi.CommandService_SubscribeServer) error err := in.Send(request) if err != nil { - slog.ErrorContext(ctx, "Failed to send management request", "error", err) + slog.ErrorContext(ctx, "Failed to send management request", "error", err, commandLogOrigin) } } } @@ -188,7 +190,7 @@ func (cs *CommandService) handleConfigUploadRequest( for _, fileToDelete := range filesToDelete { err := os.Remove(fileToDelete) if err != nil { - slog.ErrorContext(ctx, "Failed to delete file", "error", err, "path", fileToDelete) + slog.ErrorContext(ctx, "Failed to delete file", "error", err, "path", fileToDelete, commandLogOrigin) } } } @@ -223,9 +225,12 @@ func (cs *CommandService) listenForDataPlaneResponses(ctx context.Context, in mp return default: dataPlaneResponse, err := in.Recv() - slog.DebugContext(ctx, "Received data plane response", "data_plane_response", dataPlaneResponse) + slog.DebugContext(ctx, "Received data plane response", + "data_plane_response", dataPlaneResponse, + commandLogOrigin, + ) if err != nil { - slog.ErrorContext(ctx, "Failed to receive data plane response", "error", err) + slog.ErrorContext(ctx, "Failed to receive data plane response", "error", err, commandLogOrigin) return } cs.dataPlaneResponsesMutex.Lock() @@ -258,7 +263,7 @@ func (cs *CommandService) addConnectionEndpoint() { } else { var data map[string]interface{} if err := json.Unmarshal([]byte(protojson.Format(cs.connectionRequest)), &data); err != nil { - slog.Error("Failed to return connection", "error", err) + slog.Error("Failed to return connection", "error", err, commandLogOrigin) c.JSON(http.StatusInternalServerError, nil) } c.JSON(http.StatusOK, data) @@ -276,7 +281,7 @@ func (cs *CommandService) addStatusEndpoint() { } else { var data map[string]interface{} if err := json.Unmarshal([]byte(protojson.Format(cs.updateDataPlaneStatusRequest)), &data); err != nil { - slog.Error("Failed to return status", "error", err) + slog.Error("Failed to return status", "error", err, commandLogOrigin) c.JSON(http.StatusInternalServerError, nil) } c.JSON(http.StatusOK, data) @@ -294,7 +299,7 @@ func (cs *CommandService) addHealthEndpoint() { } else { var data map[string]interface{} if err := json.Unmarshal([]byte(protojson.Format(cs.updateDataPlaneHealthRequest)), &data); err != nil { - slog.Error("Failed to return data plane health", "error", err) + slog.Error("Failed to return data plane health", "error", err, commandLogOrigin) c.JSON(http.StatusInternalServerError, nil) } c.JSON(http.StatusOK, data) @@ -326,9 +331,9 @@ func (cs *CommandService) addResponseAndRequestEndpoints() { cs.server.POST("/api/v1/requests", func(c *gin.Context) { request := mpi.ManagementPlaneRequest{} body, err := io.ReadAll(c.Request.Body) - slog.Debug("Received request", "body", body) + slog.Debug("Received request", "body", body, commandLogOrigin) if err != nil { - slog.Error("Error reading request body", "err", err) + slog.Error("Error reading request body", "err", err, commandLogOrigin) c.JSON(http.StatusBadRequest, err) return @@ -337,7 +342,7 @@ func (cs *CommandService) addResponseAndRequestEndpoints() { pb := protojson.UnmarshalOptions{DiscardUnknown: true, AllowPartial: true} err = pb.Unmarshal(body, &request) if err != nil { - slog.Error("Error unmarshalling request body", "err", err) + slog.Error("Error unmarshalling request body", "err", err, commandLogOrigin) c.JSON(http.StatusBadRequest, err) return @@ -395,7 +400,7 @@ func (cs *CommandService) findInstanceConfigFiles(instanceID string) (configFile } if isValidFile(info, path) { - slog.Debug("Found file", "path", path) + slog.Debug("Found file", "path", path, commandLogOrigin) filePath := strings.Split(path, instanceDirectory)[1] @@ -408,6 +413,7 @@ func (cs *CommandService) findInstanceConfigFiles(instanceID string) (configFile "File found:", "path", file.GetFileMeta().GetName(), "hash", file.GetFileMeta().GetHash(), + commandLogOrigin, ) configFiles = append(configFiles, file) diff --git a/test/mock/grpc/mock_management_file_service.go b/test/mock/grpc/mock_management_file_service.go index 3e42af7049..e732c442c7 100644 --- a/test/mock/grpc/mock_management_file_service.go +++ b/test/mock/grpc/mock_management_file_service.go @@ -21,6 +21,8 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +var fileLogOrigin = slog.String("log_origin", "mock_management_file_service.go") + const defaultFilePermissions = 0o644 type FileService struct { @@ -44,10 +46,10 @@ func (mgs *FileService) GetOverview( ) (*v1.GetOverviewResponse, error) { configVersion := request.GetConfigVersion() - slog.Info("Getting overview", "config_version", configVersion) + slog.Info("Getting overview", "config_version", configVersion, fileLogOrigin) if _, ok := mgs.instanceFiles[request.GetConfigVersion().GetInstanceId()]; !ok { - slog.Error("Config version not found", "config_version", configVersion) + slog.Error("Config version not found", "config_version", configVersion, fileLogOrigin) return nil, status.Errorf(codes.NotFound, "Config version not found") } @@ -70,7 +72,7 @@ func (mgs *FileService) UpdateOverview( if errMarshaledJSON != nil { return nil, fmt.Errorf("failed to marshal struct back to JSON: %w", errMarshaledJSON) } - slog.Info("Updating overview JSON", "overview", marshaledJSON) + slog.Info("Updating overview JSON", "overview", marshaledJSON, fileLogOrigin) mgs.instanceFiles[overview.GetConfigVersion().GetInstanceId()] = overview.GetFiles() @@ -98,18 +100,18 @@ func (mgs *FileService) GetFile( fileName := request.GetFileMeta().GetName() fileHash := request.GetFileMeta().GetHash() - slog.Info("Getting file", "name", fileName, "hash", fileHash) + slog.Info("Getting file", "name", fileName, "hash", fileHash, fileLogOrigin) fullFilePath := mgs.findFile(request.GetFileMeta()) if fullFilePath == "" { - slog.Error("File not found", "file_name", fileName) + slog.Error("File not found", "file_name", fileName, fileLogOrigin) return nil, status.Errorf(codes.NotFound, "File not found") } bytes, err := os.ReadFile(fullFilePath) if err != nil { - slog.Error("Failed to get file contents", "full_file_path", fullFilePath, "error", err) + slog.Error("Failed to get file contents", "full_file_path", fullFilePath, "error", err, fileLogOrigin) return nil, status.Errorf(codes.Internal, "Failed to get file contents") } @@ -130,21 +132,21 @@ func (mgs *FileService) UpdateFile( fileHash := fileMeta.GetHash() filePermissions := fileMeta.GetPermissions() - slog.Info("Updating file", "name", fileName, "hash", fileHash) + slog.Info("Updating file", "name", fileName, "hash", fileHash, fileLogOrigin) fullFilePath := mgs.findFile(request.GetFile().GetFileMeta()) if _, err := os.Stat(fullFilePath); os.IsNotExist(err) { statErr := os.MkdirAll(filepath.Dir(fullFilePath), os.ModePerm) if statErr != nil { - slog.Info("Failed to create/update file", "full_file_path", fullFilePath, "error", statErr) + slog.Info("Failed to create/update file", "full_file_path", fullFilePath, "error", statErr, fileLogOrigin) return nil, status.Errorf(codes.Internal, "Failed to create/update file") } } err := os.WriteFile(fullFilePath, fileContents, getFileMode(filePermissions)) if err != nil { - slog.Info("Failed to create/update file", "full_file_path", fullFilePath, "error", err) + slog.Info("Failed to create/update file", "full_file_path", fullFilePath, "error", err, fileLogOrigin) return nil, status.Errorf(codes.Internal, "Failed to create/update file") } diff --git a/test/mock/grpc/mock_management_server.go b/test/mock/grpc/mock_management_server.go index 8459c71da1..2042df4152 100644 --- a/test/mock/grpc/mock_management_server.go +++ b/test/mock/grpc/mock_management_server.go @@ -41,6 +41,8 @@ const ( connectionType = "tcp" ) +var serverLogOrigin = slog.String("log_origin", "mock_management_server.go") + var ( commandServiceLock sync.Mutex fileServiceLock sync.Mutex @@ -82,7 +84,7 @@ func NewMockManagementServer( grpcListener, err := net.Listen(connectionType, fmt.Sprintf("%s:%d", agentConfig.Command.Server.Host, agentConfig.Command.Server.Port)) if err != nil { - slog.Error("Failed to listen", "error", err) + slog.Error("Failed to listen", "error", err, serverLogOrigin) return nil, err } @@ -96,10 +98,13 @@ func NewMockManagementServer( go reportHealth(healthcheck, agentConfig) go func() { - slog.Info("Starting mock management plane gRPC server", "address", grpcListener.Addr().String()) + slog.Info("Starting mock management plane gRPC server", + "address", grpcListener.Addr().String(), + serverLogOrigin, + ) grpcErr := grpcServer.Serve(grpcListener) if grpcErr != nil { - slog.Error("Failed to start mock management plane gRPC server", "error", grpcErr) + slog.Error("Failed to start mock management plane gRPC server", "error", grpcErr, serverLogOrigin) } }() @@ -202,7 +207,7 @@ func createListener(apiAddress string, agentConfig *config.Config) (net.Listener cert, keyPairErr := tls.LoadX509KeyPair(agentConfig.Command.TLS.Cert, agentConfig.Command.TLS.Key) if keyPairErr == nil { - slog.Error("Failed to load key and cert pair", "error", keyPairErr) + slog.Error("Failed to load key and cert pair", "error", keyPairErr, serverLogOrigin) return tls.Listen(connectionType, apiAddress, &tls.Config{ Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12, diff --git a/test/protos/common.go b/test/protos/common.go index 79d79a984f..d12eef62f0 100644 --- a/test/protos/common.go +++ b/test/protos/common.go @@ -14,12 +14,14 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +var logOrigin = slog.String("log_origin", "common.go") + const messageID = "964e1e51-44cc-4c55-8422-2a3205bdfc2f" func CreateProtoTime(timeString string) (*timestamppb.Timestamp, error) { newTime, err := time.Parse(time.RFC3339, timeString) if err != nil { - slog.Error("failed to parse time") + slog.Error("failed to parse time", logOrigin) return timestamppb.Now(), err }