Skip to content

Update some internal logs #1060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: v3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ linters-settings:
- github.com/jmoiron/sqlx
sloglint:
# Enforce not mixing key-value pairs and attributes. Default: true
no-mixed-args: true
no-mixed-args: false
# Enforce using key-value pairs only (overrides no-mixed-args, incompatible with attr-only). Default: false
kv-only: false
# Enforce using attributes only (overrides no-mixed-args, incompatible with kv-only). Default: false
Expand Down
7 changes: 5 additions & 2 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/nginx/agent/v3/internal/config"
)

var logOrigin = slog.String("log_origin", "main.go")

var (
// set at buildtime
commit = ""
Expand All @@ -32,7 +34,7 @@ func main() {
go func() {
select {
case <-sigChan:
slog.WarnContext(ctx, "NGINX Agent exiting")
slog.WarnContext(ctx, "NGINX Agent exiting", logOrigin)
cancel()

time.Sleep(config.DefGracefulShutdownPeriod)
Expand All @@ -41,6 +43,7 @@ func main() {
"Failed to gracefully shutdown within timeout of %v. Exiting",
config.DefGracefulShutdownPeriod,
),
logOrigin,
)
os.Exit(1)
case <-ctx.Done():
Expand All @@ -51,6 +54,6 @@ func main() {

err := app.Run(ctx)
if err != nil {
slog.ErrorContext(ctx, "NGINX Agent exiting due to error", "error", err)
slog.ErrorContext(ctx, "NGINX Agent exiting due to error", "error", err, logOrigin)
}
}
9 changes: 6 additions & 3 deletions internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/spf13/cobra"
)

var logOrigin = slog.String("log_origin", "app.go")

const (
defaultMessagePipeChannelSize = 100
defaultQueueSize = 100
Expand All @@ -35,25 +37,26 @@ func (a *App) Run(ctx context.Context) error {
config.RegisterRunner(func(_ *cobra.Command, _ []string) {
err := config.RegisterConfigFile()
if err != nil {
slog.ErrorContext(ctx, "Failed to load configuration file", "error", err)
slog.ErrorContext(ctx, "Failed to load configuration file", "error", err, logOrigin)
return
}

agentConfig, err := config.ResolveConfig()
if err != nil {
slog.ErrorContext(ctx, "Invalid config", "error", err)
slog.ErrorContext(ctx, "Invalid config", "error", err, logOrigin)
return
}

slog.InfoContext(ctx, "Starting NGINX Agent",
slog.String("version", a.version),
slog.String("commit", a.commit),
logOrigin,
)

messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize)
err = messagePipe.Register(defaultQueueSize, plugin.LoadPlugins(ctx, agentConfig))
if err != nil {
slog.ErrorContext(ctx, "Failed to register plugins", "error", err)
slog.ErrorContext(ctx, "Failed to register plugins", "error", err, logOrigin)
return
}

Expand Down
7 changes: 5 additions & 2 deletions internal/bus/message_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
messagebus "github.com/vardius/message-bus"
)

var logOrigin = slog.String("log_origin", "message_pipe.go")

type (
Payload interface{}

Expand Down Expand Up @@ -81,7 +83,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin) error {
pluginsRegistered = append(pluginsRegistered, plugin.Info().Name)
}

slog.Info("Finished registering plugins", "plugins", pluginsRegistered)
slog.Info("Finished registering plugins", "plugins", pluginsRegistered, logOrigin)

return nil
}
Expand Down Expand Up @@ -195,7 +197,7 @@ func (p *MessagePipe) initPlugins(ctx context.Context) {
for index, plugin := range p.plugins {
err := plugin.Init(ctx, p)
if err != nil {
slog.ErrorContext(ctx, "Failed to initialize plugin", "plugin", plugin.Info().Name, "error", err)
slog.ErrorContext(ctx, "Failed to initialize plugin", "plugin", plugin.Info().Name, "error", err, logOrigin)

unsubscribeError := p.unsubscribePlugin(ctx, index, plugin)
if unsubscribeError != nil {
Expand All @@ -204,6 +206,7 @@ func (p *MessagePipe) initPlugins(ctx context.Context) {
"Failed to unsubscribe plugin",
"plugin", plugin.Info().Name,
"error", unsubscribeError,
logOrigin,
)
}
}
Expand Down
65 changes: 39 additions & 26 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type (
}
)

var pluginLogOrigin = slog.String("log_origin", "otel_collector_plugin.go")

var (
_ bus.Plugin = (*Collector)(nil)
initMutex = &sync.Mutex{}
Expand Down Expand Up @@ -101,14 +103,16 @@ func (oc *Collector) GetState() otelcol.State {

// Init initializes and starts the plugin
func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) error {
slog.InfoContext(ctx, "Starting OTel Collector plugin")
slog.InfoContext(ctx, "Starting OTel Collector plugin", pluginLogOrigin)

var runCtx context.Context
runCtx, oc.cancel = context.WithCancel(ctx)

if !oc.config.AreReceiversConfigured() {
slog.InfoContext(runCtx, "No receivers configured for OTel Collector. "+
"Waiting to discover a receiver before starting OTel collector.")
"Waiting to discover a receiver before starting OTel collector.",
pluginLogOrigin,
)

return nil
}
Expand All @@ -128,7 +132,7 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro

bootErr := oc.bootup(runCtx)
if bootErr != nil {
slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr)
slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr, pluginLogOrigin)
}

return nil
Expand All @@ -138,37 +142,40 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro
func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) {
for _, receiver := range receivers {
if receiver.OtlpTLSConfig == nil {
slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.")
slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.",
pluginLogOrigin)

continue
}

if receiver.OtlpTLSConfig.GenerateSelfSignedCert {
slog.WarnContext(ctx,
"Self-signed certificate for OTel receiver requested, "+
"this is not recommended for production environments.",
pluginLogOrigin,
)

if receiver.OtlpTLSConfig.ExistingCert {
slog.WarnContext(ctx,
"Certificate file already exists, skipping self-signed certificate generation",
)
"Certificate file already exists, skipping self-signed certificate generation", pluginLogOrigin)
}
} else {
slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.")
slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.",
pluginLogOrigin)
}
}
}

func (oc *Collector) bootup(ctx context.Context) error {
slog.InfoContext(ctx, "Starting OTel collector")
slog.InfoContext(ctx, "Starting OTel collector", pluginLogOrigin)
errChan := make(chan error)

go func() {
appErr := oc.service.Run(ctx)
if appErr != nil {
errChan <- appErr
}
slog.InfoContext(ctx, "OTel collector run finished")
slog.InfoContext(ctx, "OTel collector run finished", pluginLogOrigin)
}()

for {
Expand Down Expand Up @@ -204,10 +211,10 @@ func (oc *Collector) Info() *bus.Info {

// Close the plugin.
func (oc *Collector) Close(ctx context.Context) error {
slog.InfoContext(ctx, "Closing OTel Collector plugin")
slog.InfoContext(ctx, "Closing OTel Collector plugin", pluginLogOrigin)

if !oc.stopped {
slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState())
slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState(), pluginLogOrigin)
oc.service.Shutdown()
oc.cancel()

Expand All @@ -222,9 +229,12 @@ func (oc *Collector) Close(ctx context.Context) error {
})

if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState())
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector",
"error", err, "state", oc.service.GetState(),
pluginLogOrigin,
)
} else {
slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState())
slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState(), pluginLogOrigin)
oc.stopped = true
}
}
Expand All @@ -240,7 +250,7 @@ func (oc *Collector) Process(ctx context.Context, msg *bus.Message) {
case bus.ResourceUpdateTopic:
oc.handleResourceUpdate(ctx, msg)
default:
slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic)
slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic, pluginLogOrigin)
}
}

Expand All @@ -258,17 +268,19 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa

nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data)
slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext",
"payload", msg.Data, pluginLogOrigin)

return
}

reloadCollector := oc.checkForNewReceivers(nginxConfigContext)

if reloadCollector {
slog.InfoContext(ctx, "Reloading OTel collector config")
slog.InfoContext(ctx, "Reloading OTel collector config", pluginLogOrigin)
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err, pluginLogOrigin)
return
}

Expand All @@ -282,18 +294,18 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)

resourceUpdateContext, ok := msg.Data.(*v1.Resource)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *v1.Resource", "payload", msg.Data)
slog.ErrorContext(ctx, "Unable to cast message payload to *v1.Resource", "payload", msg.Data, pluginLogOrigin)
return
}

resourceProcessorUpdated := oc.updateResourceProcessor(resourceUpdateContext)
headersSetterExtensionUpdated := oc.updateHeadersSetterExtension(ctx, resourceUpdateContext)

if resourceProcessorUpdated || headersSetterExtensionUpdated {
slog.InfoContext(ctx, "Reloading OTel collector config")
slog.InfoContext(ctx, "Reloading OTel collector config", pluginLogOrigin)
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err, pluginLogOrigin)
return
}

Expand Down Expand Up @@ -343,9 +355,9 @@ func (oc *Collector) updateHeadersSetterExtension(
}

if !isUUIDHeaderSet {
slog.DebugContext(
ctx, "Adding uuid header to OTel collector",
slog.DebugContext(ctx, "Adding uuid header to OTel collector",
"uuid", resourceUpdateContext.GetResourceId(),
pluginLogOrigin,
)
oc.config.Collector.Extensions.HeadersSetter.Headers = append(
oc.config.Collector.Extensions.HeadersSetter.Headers,
Expand All @@ -366,14 +378,14 @@ func (oc *Collector) updateHeadersSetterExtension(
func (oc *Collector) restartCollector(ctx context.Context) {
err := oc.Close(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err)
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, pluginLogOrigin)
return
}

settings := OTelCollectorSettings(oc.config)
oTelCollector, err := otelcol.NewCollector(settings)
if err != nil {
slog.ErrorContext(ctx, "Failed to create OTel Collector", "error", err)
slog.ErrorContext(ctx, "Failed to create OTel Collector", "error", err, pluginLogOrigin)
return
}
oc.service = oTelCollector
Expand All @@ -382,13 +394,14 @@ func (oc *Collector) restartCollector(ctx context.Context) {
runCtx, oc.cancel = context.WithCancel(ctx)

if !oc.stopped {
slog.ErrorContext(ctx, "Unable to restart OTel collector, failed to stop collector")
slog.ErrorContext(ctx, "Unable to restart OTel collector, failed to stop collector", pluginLogOrigin)

return
}

bootErr := oc.bootup(runCtx)
if bootErr != nil {
slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr)
slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr, pluginLogOrigin)
}
}

Expand Down
4 changes: 3 additions & 1 deletion internal/collector/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
configFilePermission = 0o600
)

var settingsLogOrigin = slog.String("log_origin", "settings.go")

//go:embed otelcol.tmpl
var otelcolTemplate string

Expand Down Expand Up @@ -115,7 +117,7 @@ func writeCollectorConfig(conf *config.Collector) error {
defer func() {
err = file.Close()
if err != nil {
slog.Warn("Failed to close file", "file_path", confPath)
slog.Warn("Failed to close file", "file_path", confPath, settingsLogOrigin)
}
}()
if err != nil {
Expand Down
Loading