diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index e96e96d08..aca620313 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icinga-go-library/utils" @@ -15,6 +16,7 @@ import ( v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/notifications" "github.com/okzk/sdnotify" "github.com/pkg/errors" "go.uber.org/zap" @@ -168,13 +170,44 @@ func run() int { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) - go func() { - logger.Info("Starting history sync") + { + var ( + callbackName string + callbackKeyStructPtr map[string]any + callbackFn func(database.Entity) bool + ) - if err := hs.Sync(ctx); err != nil && !utils.IsContextCanceled(err) { - logger.Fatalf("%+v", err) + if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" { + logger.Info("Starting Icinga Notifications source") + + notificationsSource, err := notifications.NewNotificationsClient( + ctx, + db, + rc, + logs.GetChildLogger("notifications-source"), + cfg) + if err != nil { + logger.Fatalw("Can't create Icinga Notifications client from config", zap.Error(err)) + } + + callbackName = "notifications_sync" + callbackKeyStructPtr = notifications.SyncKeyStructPtrs + callbackFn = notificationsSource.Submit } - }() + + go func() { + logger.Info("Starting history sync") + + if err := hs.Sync( + ctx, + callbackName, + callbackKeyStructPtr, + callbackFn, + ); err != nil && !utils.IsContextCanceled(err) { + logger.Fatalf("%+v", err) + } + }() + } // Main loop for { diff --git a/config.example.yml b/config.example.yml index a8e66b9a8..de5706abf 100644 --- a/config.example.yml +++ b/config.example.yml @@ -139,3 +139,15 @@ redis: # flapping: # notification: # state: + +# Icinga DB can act as an event source for Icinga Notifications. If the following block is not empty, Icinga DB will +# submit events to the Icinga Notifications API. +# notifications-source: +# # URL to the API root. +# api-base-url: http://localhost:5680 +# +# Username to authenticate against the Icinga Notifications API. +# username: icingadb +# +# # Password for the defined user. +# password: insecureinsecure diff --git a/doc/01-About.md b/doc/01-About.md index c880ac00c..fea23d846 100644 --- a/doc/01-About.md +++ b/doc/01-About.md @@ -34,6 +34,10 @@ Icinga DB Web also connects to the Icinga 2 API with its Command Transport to ac These are the components of Icinga DB embedded into an Icinga setup with Icinga 2 and Icinga Web 2. +Since the Icinga DB daemon always receives the latest information from RedisĀ®, it is an ideal candidate to distribute information further. +In addition to inserting data into a relational database, Icinga DB can also forward events to [Icinga Notifications](https://icinga.com/docs/icinga-notifications/), +as described in the [configuration section](03-Configuration.md#notifications-source-configuration). + ## Installation To install Icinga DB see [Installation](02-Installation.md). diff --git a/doc/03-Configuration.md b/doc/03-Configuration.md index cccfd2233..a1a3a0693 100644 --- a/doc/03-Configuration.md +++ b/doc/03-Configuration.md @@ -146,7 +146,7 @@ ICINGADB_LOGGING_OPTIONS=database:error,high-availability:debug | runtime-updates | Runtime updates of config objects after the initial config synchronization. | | telemetry | Reporting of Icinga DB status to Icinga 2 via RedisĀ® (for monitoring purposes). | -## Retention +## Retention Configuration By default, no historical data is deleted, which means that the longer the data is retained, the more disk space is required to store it. History retention is an optional feature that allows to @@ -174,6 +174,20 @@ ICINGADB_RETENTION_OPTIONS=comment:356 | count | **Optional.** Number of old historical data a single query can delete in a `"DELETE FROM ... LIMIT count"` manner. Defaults to `5000`. | | options | **Optional.** Map of history category to number of days to retain its data. Available categories are `acknowledgement`, `comment`, `downtime`, `flapping`, `notification` and `state`. | +## Notifications Source Configuration + +Icinga DB can act as an event source for [Icinga Notifications](https://icinga.com/docs/icinga-notifications/). +If configured, Icinga DB will submit events to the Icinga Notifications API. + +For YAML configuration, the options are part of the `notifications-source` dictionary. +For environment variables, each option is prefixed with `ICINGADB_NOTIFICATIONS_SOURCE_`. + +| Option | Description | +|--------------|-----------------------------------------------------------------------------------| +| api-base-url | **Optional.** Icinga Notifications API base URL, such as `http://localhost:5680`. | +| username | **Optional.** Icinga Notifications API user for this source. | +| password | **Optional.** Icinga Notifications API user password. | + ## Appendix ### Duration String diff --git a/go.mod b/go.mod index cb6a0ac41..81b24f41d 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/goccy/go-yaml v1.13.0 github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 - github.com/icinga/icinga-go-library v0.7.2 + github.com/icinga/icinga-go-library v0.7.3-0.20251022120618-6600889adc38 github.com/jessevdk/go-flags v1.6.1 github.com/jmoiron/sqlx v1.4.0 github.com/mattn/go-sqlite3 v1.14.32 @@ -34,7 +34,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.12 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/redis/go-redis/v9 v9.10.0 // indirect + github.com/redis/go-redis/v9 v9.14.1 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/ssgreg/journald v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index c849198af..d544422f6 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/icinga/icinga-go-library v0.7.2 h1:6ilUeE9F9OqxxJXNR9URWDf6zOqsdhjjR9w1MUXY9Kg= -github.com/icinga/icinga-go-library v0.7.2/go.mod h1:HZTiYD+N+9FZIVpPdUEJWJnc6sLvrIRO03jvkdkmUEU= +github.com/icinga/icinga-go-library v0.7.3-0.20251022120618-6600889adc38 h1:5RNrPZCwvqm2/06i9dUCJtcBV+tR8WgUKtHne2sOaA8= +github.com/icinga/icinga-go-library v0.7.3-0.20251022120618-6600889adc38/go.mod h1:L80M/ufoqFJJjZcdnfsTp6eFl06vm3JuvSWlGcDf708= github.com/jessevdk/go-flags v1.6.1 h1:Cvu5U8UGrLay1rZfv/zP7iLpSHGUZ/Ou68T0iX1bBK4= github.com/jessevdk/go-flags v1.6.1/go.mod h1:Mk8T1hIAWpOiJiHa9rJASDK2UGWji0EuPGBnNLMooyc= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= @@ -63,8 +63,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= -github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ= +github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= diff --git a/internal/config/config.go b/internal/config/config.go index 503e6dbf6..359cf052e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,6 +4,7 @@ import ( "github.com/creasty/defaults" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/notifications/source" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icingadb/pkg/icingadb/history" "github.com/pkg/errors" @@ -15,10 +16,11 @@ const DefaultConfigPath = "/etc/icingadb/config.yml" // Config defines Icinga DB config. type Config struct { - Database database.Config `yaml:"database" envPrefix:"DATABASE_"` - Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"` - Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"` - Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"` + Database database.Config `yaml:"database" envPrefix:"DATABASE_"` + Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"` + Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"` + Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"` + NotificationsSource source.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"` } func (c *Config) SetDefaults() { diff --git a/pkg/icingadb/history/retention.go b/pkg/icingadb/history/retention.go index 2d3a6de1a..e9d893bdc 100644 --- a/pkg/icingadb/history/retention.go +++ b/pkg/icingadb/history/retention.go @@ -230,7 +230,7 @@ func (r *Retention) Start(ctx context.Context) error { deleted, err := stmt.CleanupOlderThan( ctx, r.db, e.Id, r.count, olderThan, - database.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup), + database.OnSuccessIncrement[struct{}](telemetry.Stats.Get(telemetry.StatHistoryCleanup)), ) if err != nil { select { diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index c0efb1515..7a3dc3a20 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -2,6 +2,7 @@ package history import ( "context" + "fmt" "github.com/icinga/icinga-go-library/com" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" @@ -15,9 +16,12 @@ import ( v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/pkg/errors" + "go.uber.org/zap" "golang.org/x/sync/errgroup" "reflect" + "slices" "sync" + "time" ) // Sync specifies the source and destination of a history sync. @@ -37,7 +41,26 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync } // Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. -func (s Sync) Sync(ctx context.Context) error { +// +// It is possible to enable a callback functionality, e.g., for the Icinga Notifications integration. To do so, the +// optional callbackFn and callbackKeyStructPtr must be set. Both must either be nil or not nil. If set, the additional +// callbackName must also be set, to be used in [telemetry.Stats]. +// +// The callbackKeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If +// a key is missing from the map, it will not be used for the callback. The callbackFn function shall not block. +func (s Sync) Sync( + ctx context.Context, + callbackName string, + callbackKeyStructPtr map[string]any, + callbackFn func(database.Entity) bool, +) error { + if (callbackKeyStructPtr == nil) != (callbackFn == nil) { + return fmt.Errorf("either both callbackKeyStructPtr and callbackFn must be nil or none") + } + if (callbackKeyStructPtr != nil) && (callbackName == "") { + return fmt.Errorf("if callbackKeyStructPtr and callbackFn are set, a callbackName is required") + } + g, ctx := errgroup.WithContext(ctx) for key, pipeline := range syncPipelines { @@ -62,6 +85,21 @@ func (s Sync) Sync(ctx context.Context) error { // it has processed it, even if the stage itself does not do anything with this specific entry. It should only // forward the entry after it has completed its own sync so that later stages can rely on previous stages being // executed successfully. + // + // If a callback exists for this key, it will be appended to the pipeline. Thus, it is executed after every + // other pipeline action, but before deleteFromRedis. + + var hasCallbackStage bool + if callbackKeyStructPtr != nil { + _, exists := callbackKeyStructPtr[key] + hasCallbackStage = exists + } + + // Shadowed variable to allow appending custom callbacks. + pipeline := pipeline + if hasCallbackStage { + pipeline = append(slices.Clip(pipeline), makeCallbackStageFunc(callbackName, callbackKeyStructPtr, callbackFn)) + } ch := make([]chan redis.XMessage, len(pipeline)+1) for i := range ch { @@ -152,7 +190,6 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi } counter.Add(uint64(len(ids))) - telemetry.Stats.History.Add(uint64(len(ids))) case <-ctx.Done(): return ctx.Err() } @@ -354,32 +391,214 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re })(ctx, s, key, in, out) } +// countElementStage increments the [Stats.History] counter. +// +// This stageFunc should be called last in a [syncPipeline]. Thus, it is still executed before the final +// Sync.deleteFromRedis call in Sync.Sync. Furthermore, an optional callback function will be appended after this stage, +// resulting in an incremented history state counter for synchronized history, but stalling callback actions. +func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { + defer close(out) + + for { + select { + case msg, ok := <-in: + if !ok { + return nil + } + + telemetry.Stats.Get(telemetry.StatHistory).Add(1) + out <- msg + + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// makeCallbackStageFunc creates a new stageFunc calling the given callback function for each message. +// +// The keyStructPtrs map decides what kind of database.Entity type will be used for the input data based on the key. +// +// The callback call is blocking and the message will be forwarded to the out channel after the function has returned. +// Thus, please ensure this function does not block too long. +// +// If the callback function returns false, the stageFunc switches to a backlog mode, retrying the failed messages and +// every subsequent message until there are no messages left. Only after a message was successfully handled by the +// callback method, it will be forwarded to the out channel. Thus, this stage might "block" or "hold back" certain +// messages during unhappy callback times. +// +// For each successfully submitted message, the telemetry stat named after this callback is incremented. Thus, a delta +// between [telemetry.StatHistory] and this stat indicates blocking callbacks. +func makeCallbackStageFunc( + name string, + keyStructPtrs map[string]any, + fn func(database.Entity) bool, +) stageFunc { + return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { + defer close(out) + + structPtr, ok := keyStructPtrs[key] + if !ok { + return fmt.Errorf("can't lookup struct pointer for key %q", key) + } + + structifier := structify.MakeMapStructifier( + reflect.TypeOf(structPtr).Elem(), + "json", + contracts.SafeInit) + + makeEntity := func(values map[string]interface{}) (database.Entity, error) { + val, err := structifier(values) + if err != nil { + return nil, errors.Wrapf(err, "can't structify values %#v for %q", values, key) + } + + entity, ok := val.(database.Entity) + if !ok { + return nil, fmt.Errorf("structifier returned %T which does not implement database.Entity", val) + } + + return entity, nil + } + + backlogLastId := "" + backlogMsgCounter := 0 + + const backlogTimerMinInterval, backlogTimerMaxInterval = time.Millisecond, time.Minute + backlogTimerInterval := backlogTimerMinInterval + backlogTimer := time.NewTimer(backlogTimerInterval) + _ = backlogTimer.Stop() + + for { + select { + case msg, ok := <-in: + if !ok { + return nil + } + + // Only submit the entity directly if there is no backlog. + // The second check covers a potential corner case if the XRANGE below races this stream. + if backlogLastId != "" && backlogLastId != msg.ID { + continue + } + + entity, err := makeEntity(msg.Values) + if err != nil { + return err + } + + if fn(entity) { + out <- msg + telemetry.Stats.Get(name).Add(1) + backlogLastId = "" + } else { + backlogLastId = msg.ID + backlogMsgCounter = 0 + backlogTimerInterval = backlogTimerMinInterval + _ = backlogTimer.Reset(backlogTimerInterval) + s.logger.Warnw("Failed to submit entity to callback, entering into backlog", + zap.String("key", key), + zap.String("id", backlogLastId)) + } + + case <-backlogTimer.C: + if backlogLastId == "" { // Should never happen. + return fmt.Errorf("backlog timer logic for %q was called while backlogLastId was empty", key) + } + + logger := s.logger.With( + zap.String("key", key), + zap.String("last-id", backlogLastId)) + + logger.Debug("Trying to advance backlog of callback elements") + + xrangeCmd := s.redis.XRangeN(ctx, "icinga:history:stream:"+key, backlogLastId, "+", 2) + msgs, err := xrangeCmd.Result() + if err != nil { + return errors.Wrapf(err, "XRANGE %q to %q on stream %q failed", backlogLastId, "+", key) + } + + if len(msgs) < 1 || len(msgs) > 2 { + return fmt.Errorf("XRANGE %q to %q on stream %q returned %d messages, not 1 or 2", + backlogLastId, "+", key, len(msgs)) + } + + msg := msgs[0] + entity, err := makeEntity(msg.Values) + if err != nil { + return errors.Wrapf(err, "can't structify backlog value %q for %q", backlogLastId, key) + } + + if fn(entity) { + out <- msg + backlogMsgCounter++ + telemetry.Stats.Get(name).Add(1) + + if len(msgs) == 1 { + backlogLastId = "" + logger.Infow("Finished rolling back backlog of callback elements", zap.Int("elements", backlogMsgCounter)) + } else { + backlogLastId = msgs[1].ID + backlogTimerInterval = backlogTimerMinInterval + _ = backlogTimer.Reset(backlogTimerInterval) + logger.Debugw("Advanced backlog", + zap.String("new-last-id", backlogLastId), + zap.Duration("delay", backlogTimerInterval)) + } + } else { + backlogTimerInterval = min(backlogTimerMaxInterval, backlogTimerInterval*2) + _ = backlogTimer.Reset(backlogTimerInterval) + logger.Warnw("Failed to roll back callback elements", zap.Duration("delay", backlogTimerInterval)) + } + + case <-ctx.Done(): + return ctx.Err() + } + } + } +} + +const ( + SyncPipelineAcknowledgement = "acknowledgement" + SyncPipelineComment = "comment" + SyncPipelineDowntime = "downtime" + SyncPipelineFlapping = "flapping" + SyncPipelineNotification = "notification" + SyncPipelineState = "state" +) + var syncPipelines = map[string][]stageFunc{ - "notification": { - writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history - userNotificationStage, // user_notification_history (depends on notification_history) - writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + SyncPipelineAcknowledgement: { + writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history + writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) + countElementStage, }, - "state": { - writeOneEntityStage((*v1.StateHistory)(nil)), // state_history - writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) - writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state + SyncPipelineComment: { + writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history + writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) + countElementStage, }, - "downtime": { + SyncPipelineDowntime: { writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history) writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime + countElementStage, }, - "comment": { - writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history - writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) - }, - "flapping": { + SyncPipelineFlapping: { writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history) + countElementStage, }, - "acknowledgement": { - writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history - writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) + SyncPipelineNotification: { + writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history + userNotificationStage, // user_notification_history (depends on notification_history) + writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + countElementStage, + }, + SyncPipelineState: { + writeOneEntityStage((*v1.StateHistory)(nil)), // state_history + writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) + writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state + countElementStage, }, } diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index 65e8bcce2..6ce3e4190 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -204,7 +204,7 @@ func (s Sync) updateOverdue( } counter.Add(uint64(len(ids))) - telemetry.Stats.Overdue.Add(uint64(len(ids))) + telemetry.Stats.Get(telemetry.StatOverdue).Add(uint64(len(ids))) var op func(ctx context.Context, key string, members ...any) *redis.IntCmd if overdue { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index c84052fd9..9153eec83 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -184,7 +184,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvStmt, cvCount, sem, customvars, database.SplitOnDupId[database.Entity], database.OnSuccessIncrement[database.Entity](&counter), - database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config), + database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)), ) }) @@ -204,7 +204,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, database.SplitOnDupId[database.Entity], database.OnSuccessIncrement[database.Entity](&counter), - database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config), + database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)), ) }) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index b353c7848..73e4d2e39 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -225,8 +225,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error { func getCounterForEntity(e database.Entity) *com.Counter { switch e.(type) { case *v1.HostState, *v1.ServiceState: - return &telemetry.Stats.State + return telemetry.Stats.Get(telemetry.StatState) default: - return &telemetry.Stats.Config + return telemetry.Stats.Get(telemetry.StatConfig) } } diff --git a/pkg/icingadb/v1/history/downtime.go b/pkg/icingadb/v1/history/downtime.go index 5860d4c1d..0ca44c67d 100644 --- a/pkg/icingadb/v1/history/downtime.go +++ b/pkg/icingadb/v1/history/downtime.go @@ -81,6 +81,15 @@ func (*HistoryDowntime) TableName() string { return "history" } +// DowntimeHistoryMeta is a combined HistoryMeta struct of DowntimeHistoryEntity and DowntimeHistory. +// +// It is used in the notifications package and became necessary as values of both structs were required. +type DowntimeHistoryMeta struct { + DowntimeHistoryEntity `json:",inline"` + DowntimeHistory `json:",inline"` + HistoryMeta `json:",inline"` +} + type SlaHistoryDowntime struct { DowntimeHistoryEntity `json:",inline"` HistoryTableMeta `json:",inline"` diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go index 78f5c5c67..bbe8c03cb 100644 --- a/pkg/icingaredis/telemetry/stats.go +++ b/pkg/icingaredis/telemetry/stats.go @@ -2,34 +2,74 @@ package telemetry import ( "context" + "fmt" "github.com/icinga/icinga-go-library/com" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/periodic" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icinga-go-library/utils" "go.uber.org/zap" + "iter" "strconv" + "sync" "time" ) -var Stats struct { - // Config & co. are to be increased by the T sync once for every T object synced. - Config, State, History, Overdue, HistoryCleanup com.Counter +// StatsKeeper holds multiple [com.Counter] values by name, to be used for statistics in WriteStats. +type StatsKeeper struct { + m sync.Map } -// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. -func WriteStats(ctx context.Context, client *redis.Client, logger *logging.Logger) { - counters := map[string]*com.Counter{ - "config_sync": &Stats.Config, - "state_sync": &Stats.State, - "history_sync": &Stats.History, - "overdue_sync": &Stats.Overdue, - "history_cleanup": &Stats.HistoryCleanup, +// Get or create a [com.Counter] by its name. +func (statsKeeper *StatsKeeper) Get(key string) *com.Counter { + ctrAny, _ := statsKeeper.m.LoadOrStore(key, &com.Counter{}) + + ctr, ok := ctrAny.(*com.Counter) + if !ok { + // Should not happen unless someone messes with the internal map. + panic(fmt.Sprintf( + "StatsKeeper.Get(%q) returned something of type %T, not *com.Counter", + key, ctrAny)) } + return ctr +} + +// Iterator over all keys and their [com.Counter]. +func (statsKeeper *StatsKeeper) Iterator() iter.Seq2[string, *com.Counter] { + return func(yield func(string, *com.Counter) bool) { + statsKeeper.m.Range(func(keyAny, ctrAny any) bool { + key, keyOk := keyAny.(string) + ctr, ctrOk := ctrAny.(*com.Counter) + if !keyOk || !ctrOk { + // Should not happen unless someone messes with the internal map. + panic(fmt.Sprintf( + "iterating StatsKeeper on key %q got types (%T, %T), not (string, *com.Counter)", + keyAny, keyAny, ctrAny)) + } + + return yield(key, ctr) + }) + } +} + +// Stats is the singleton StatsKeeper to be used to access a [com.Counter]. +var Stats = &StatsKeeper{} + +// Keys for different well known Stats entries. +const ( + StatConfig = "config_sync" + StatState = "state_sync" + StatHistory = "history_sync" + StatOverdue = "overdue_sync" + StatHistoryCleanup = "history_cleanup" +) + +// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. +func WriteStats(ctx context.Context, client *redis.Client, logger *logging.Logger) { periodic.Start(ctx, time.Second, func(_ periodic.Tick) { var data []string - for kind, counter := range counters { + for kind, counter := range Stats.Iterator() { if cnt := counter.Reset(); cnt > 0 { data = append(data, kind, strconv.FormatUint(cnt, 10)) } diff --git a/pkg/icingaredis/telemetry/stats_test.go b/pkg/icingaredis/telemetry/stats_test.go new file mode 100644 index 000000000..4b851e989 --- /dev/null +++ b/pkg/icingaredis/telemetry/stats_test.go @@ -0,0 +1,44 @@ +package telemetry + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestStatsKeeper(t *testing.T) { + desiredState := map[string]uint64{ + "foo": 23, + "bar": 42, + "baz": 0, + } + + stats := &StatsKeeper{} + + // Populate based on desiredState + for key, counterValue := range desiredState { + ctr := stats.Get(key) + ctr.Add(counterValue) + } + + // Check if desiredState is set + for key, counterValue := range desiredState { + ctr := stats.Get(key) + assert.Equal(t, counterValue, ctr.Val()) + } + + // Get reference, change value, compare + fooKey := "foo" + fooCtr := stats.Get(fooKey) + assert.Equal(t, desiredState[fooKey], fooCtr.Reset()) + assert.Equal(t, uint64(0), fooCtr.Val()) + assert.Equal(t, uint64(0), stats.Get(fooKey).Val()) + fooCtr.Add(desiredState[fooKey]) + assert.Equal(t, desiredState[fooKey], stats.Get(fooKey).Val()) + + // Range over + for key, ctr := range stats.Iterator() { + desired, ok := desiredState[key] + assert.True(t, ok) + assert.Equal(t, desired, ctr.Val()) + } +} diff --git a/pkg/notifications/notifications.go b/pkg/notifications/notifications.go new file mode 100644 index 000000000..8743ed365 --- /dev/null +++ b/pkg/notifications/notifications.go @@ -0,0 +1,494 @@ +package notifications + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "sync" + + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/notifications/event" + "github.com/icinga/icinga-go-library/notifications/source" + "github.com/icinga/icinga-go-library/redis" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-go-library/utils" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/icingadb/history" + v1history "github.com/icinga/icingadb/pkg/icingadb/v1/history" + "github.com/pkg/errors" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// Client is an Icinga Notifications compatible client implementation to push events to Icinga Notifications. +// +// A new Client should be created by the NewNotificationsClient function. New history entries can be submitted by +// calling the Client.Submit method. +type Client struct { + source.Config + + db *database.DB + logger *logging.Logger + + rulesInfo *source.RulesInfo // rulesInfo holds the latest rulesInfo fetched from Icinga Notifications. + + ctx context.Context + + notificationsClient *source.Client // The Icinga Notifications client used to interact with the API. + redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events. + + submissionMutex sync.Mutex // submissionMutex protects not concurrent safe struct fields in Client.Submit, i.e., rulesInfo. +} + +// NewNotificationsClient creates a new Client connected to an existing database and logger. +func NewNotificationsClient( + ctx context.Context, + db *database.DB, + rc *redis.Client, + logger *logging.Logger, + cfg source.Config, +) (*Client, error) { + notificationsClient, err := source.NewClient(cfg, "Icinga DB "+internal.Version.Version) + if err != nil { + return nil, err + } + + return &Client{ + Config: cfg, + + db: db, + logger: logger, + + ctx: ctx, + + rulesInfo: &source.RulesInfo{}, + + notificationsClient: notificationsClient, + redisClient: rc, + }, nil +} + +// evaluateRulesForObject checks each rule against the Icinga DB SQL database and returns matching rule IDs. +// +// Within the Icinga Notifications relation database, the rules are stored in rule.object_filter as a JSON object +// created by Icinga DB Web. This object contains SQL queries with bindvars for the Icinga DB relational database, to be +// executed with the given host, service and environment IDs. If this query returns at least one row, the rule is +// considered as matching. +// +// Icinga DB Web's JSON structure is described in: +// - https://github.com/Icinga/icingadb-web/pull/1289 +// - https://github.com/Icinga/icingadb/pull/998#issuecomment-3442298348 +func (client *Client) evaluateRulesForObject(ctx context.Context, hostId, serviceId, environmentId types.Binary) ([]string, error) { + const ( + icingaDbWebRuleVersion = 1 + icingaDbWebRuleTypeAll = "all" + icingaDbWebRuleTypeHost = "host" + icingaDbWebRuleTypeService = "service" + ) + + type IcingaDbWebQuery struct { + Query string `json:"query"` + Parameters []string `json:"parameters"` + } + + type IcingaDbWebRule struct { + Version int `json:"version"` // expect icingaDbWebRuleVersion + Config struct { + Type string `json:"type"` // expect one of [all, host, service] + Filter string `json:"filter"` // Icinga DB Web filter expression + } `json:"config"` + Queries struct { + Host *IcingaDbWebQuery `json:"host"` + Service *IcingaDbWebQuery `json:"service,omitempty"` + } `json:"queries"` + } + + outRuleIds := make([]string, 0, len(client.rulesInfo.Rules)) + + for id, filterExpr := range client.rulesInfo.Rules { + if filterExpr == "" { + outRuleIds = append(outRuleIds, id) + continue + } + + var webRule IcingaDbWebRule + if err := json.Unmarshal([]byte(filterExpr), &webRule); err != nil { + return nil, errors.Wrap(err, "cannot decode rule filter expression as JSON into struct") + } + + if version := webRule.Version; version != icingaDbWebRuleVersion { + return nil, errors.Errorf("decoded rule filter expression .Version is %d, %d expected", version, icingaDbWebRuleVersion) + } + if cfgType := webRule.Config.Type; !slices.Contains( + []string{icingaDbWebRuleTypeAll, icingaDbWebRuleTypeHost, icingaDbWebRuleTypeService}, cfgType) { + return nil, errors.Errorf("decoded rule filter expression contains unsupported .Config.Type %q", cfgType) + } + if cfgType := webRule.Config.Type; cfgType != icingaDbWebRuleTypeService && webRule.Queries.Host == nil { + return nil, errors.Errorf("decoded rule filter expression for .Config.Type %q with an empty .Queries.Host", cfgType) + } + if cfgType := webRule.Config.Type; cfgType != icingaDbWebRuleTypeHost && webRule.Queries.Service == nil { + return nil, errors.Errorf("decoded rule filter expression for .Config.Type %q with an empty .Queries.Service", cfgType) + } + + var webQuery IcingaDbWebQuery + if !serviceId.Valid() { + if webRule.Config.Type == icingaDbWebRuleTypeService { + continue + } + webQuery = *webRule.Queries.Host + } else { + if webRule.Config.Type == icingaDbWebRuleTypeHost { + continue + } + webQuery = *webRule.Queries.Service + } + + queryArgs := make([]any, 0, len(webQuery.Parameters)) + for _, param := range webQuery.Parameters { + switch param { + case ":host_id": + queryArgs = append(queryArgs, hostId.String()) + case ":service_id": + if !serviceId.Valid() { + return nil, errors.New("host rule filter expression contains :service_id for replacement") + } + queryArgs = append(queryArgs, serviceId.String()) + case ":environment_id": + queryArgs = append(queryArgs, environmentId.String()) + default: + queryArgs = append(queryArgs, param) + } + } + + evaluates, err := func() (bool, error) { + rows, err := client.db.QueryContext(ctx, client.db.Rebind(webQuery.Query), queryArgs...) + if err != nil { + return false, err + } + defer func() { _ = rows.Close() }() + + if !rows.Next() { + return false, nil + } + return true, nil + }() + if err != nil { + return nil, errors.Wrapf(err, "cannot fetch rule %q from %q", id, filterExpr) + } else if !evaluates { + continue + } + outRuleIds = append(outRuleIds, id) + } + + return outRuleIds, nil +} + +// buildCommonEvent creates an event.Event based on Host and (optional) Service IDs. +// +// This function is used by all event builders to create a common event structure that includes the host and service +// names, an Icinga DB Web reference, and the tags for the event. +// Any event type-specific information (like severity, message, etc.) is added by the specific event builders. +func (client *Client) buildCommonEvent( + ctx context.Context, + hostId, serviceId types.Binary, +) (*event.Event, *redisLookupResult, error) { + rlr, err := client.fetchHostServiceFromRedis(ctx, hostId, serviceId) + if err != nil { + return nil, nil, err + } + + var ( + objectName string + objectUrl string + objectTags map[string]string + ) + + if rlr.serviceName != "" { + objectName = rlr.hostName + "!" + rlr.serviceName + objectUrl = "/icingadb/service?name=" + utils.RawUrlEncode(rlr.serviceName) + "&host.name=" + utils.RawUrlEncode(rlr.hostName) + objectTags = map[string]string{ + "host": rlr.hostName, + "service": rlr.serviceName, + } + } else { + objectName = rlr.hostName + objectUrl = "/icingadb/host?name=" + utils.RawUrlEncode(rlr.hostName) + objectTags = map[string]string{ + "host": rlr.hostName, + } + } + + return &event.Event{ + Name: objectName, + URL: objectUrl, + Tags: objectTags, + ExtraTags: rlr.CustomVars(), + }, rlr, nil +} + +// buildStateHistoryEvent builds a fully initialized event.Event from a state history entry. +// +// The resulted event will have all the necessary information for a state change event, and must +// not be further modified by the caller. +func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { + ev, rlr, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) + if err != nil { + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) + } + + ev.Type = event.TypeState + + if rlr.serviceName != "" { + switch h.HardState { + case 0: + ev.Severity = event.SeverityOK + case 1: + ev.Severity = event.SeverityWarning + case 2: + ev.Severity = event.SeverityCrit + case 3: + ev.Severity = event.SeverityErr + default: + return nil, fmt.Errorf("unexpected service state %d", h.HardState) + } + } else { + switch h.HardState { + case 0: + ev.Severity = event.SeverityOK + case 1: + ev.Severity = event.SeverityCrit + default: + return nil, fmt.Errorf("unexpected host state %d", h.HardState) + } + } + + if h.Output.Valid { + ev.Message = h.Output.String + } + if h.LongOutput.Valid { + ev.Message += "\n" + h.LongOutput.String + } + + return ev, nil +} + +// buildDowntimeHistoryMetaEvent from a downtime history entry. +func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) { + ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) + if err != nil { + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) + } + + switch h.EventType { + case "downtime_start": + ev.Type = event.TypeDowntimeStart + ev.Username = h.Author + ev.Message = h.Comment + ev.Mute = types.MakeBool(true) + ev.MuteReason = "Checkable is in downtime" + + case "downtime_end": + ev.Mute = types.MakeBool(false) + if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool { + ev.Type = event.TypeDowntimeRemoved + ev.Message = "Downtime was cancelled" + + if h.CancelledBy.Valid { + ev.Username = h.CancelledBy.String + } + } else { + ev.Type = event.TypeDowntimeEnd + ev.Message = "Downtime expired" + } + + default: + return nil, fmt.Errorf("unexpected event type %q", h.EventType) + } + + return ev, nil +} + +// buildFlappingHistoryEvent from a flapping history entry. +func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { + ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) + if err != nil { + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) + } + + if h.PercentStateChangeEnd.Valid { + ev.Type = event.TypeFlappingEnd + ev.Message = fmt.Sprintf( + "Checkable stopped flapping (Current flapping value %.2f%% < low threshold %.2f%%)", + h.PercentStateChangeEnd.Float64, h.FlappingThresholdLow) + } else if h.PercentStateChangeStart.Valid { + ev.Type = event.TypeFlappingStart + ev.Message = fmt.Sprintf( + "Checkable started flapping (Current flapping value %.2f%% > high threshold %.2f%%)", + h.PercentStateChangeStart.Float64, h.FlappingThresholdHigh) + ev.Mute = types.MakeBool(true) + ev.MuteReason = "Checkable is flapping" + } else { + return nil, errors.New("flapping history entry has neither percent_state_change_start nor percent_state_change_end") + } + + return ev, nil +} + +// buildAcknowledgementHistoryEvent from an acknowledgment history entry. +func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { + ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) + if err != nil { + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) + } + + if !h.ClearTime.Time().IsZero() { + ev.Type = event.TypeAcknowledgementCleared + ev.Message = "Checkable was cleared" + + if h.ClearedBy.Valid { + ev.Username = h.ClearedBy.String + } + } else if !h.SetTime.Time().IsZero() { + ev.Type = event.TypeAcknowledgementSet + + if h.Comment.Valid { + ev.Message = h.Comment.String + } else { + ev.Message = "Checkable was acknowledged" + } + + if h.Author.Valid { + ev.Username = h.Author.String + } + } else { + return nil, errors.New("acknowledgment history entry has neither a set_time nor a clear_time") + } + + return ev, nil +} + +// Submit this [database.Entity] to the Icinga Notifications API. +// +// Based on the entity's type, a different kind of event will be constructed. The event will be sent to the API in a +// blocking fashion. +// +// Returns true if this entity was processed or cannot be processed any further. Returns false if this entity should be +// retried later. +// +// This method usees the Client's logger. +func (client *Client) Submit(entity database.Entity) bool { + if client.ctx.Err() != nil { + client.logger.Errorw("Cannot process submitted entity as client context is done", zap.Error(client.ctx.Err())) + return true + } + + var ( + ev *event.Event + eventErr error + metaHistory v1history.HistoryTableMeta + ) + + // Keep the type switch in sync with the values of SyncKeyStructPtrs below. + switch h := entity.(type) { + case *v1history.AcknowledgementHistory: + ev, eventErr = client.buildAcknowledgementHistoryEvent(client.ctx, h) + metaHistory = h.HistoryTableMeta + + case *v1history.DowntimeHistoryMeta: + ev, eventErr = client.buildDowntimeHistoryMetaEvent(client.ctx, h) + metaHistory = h.HistoryTableMeta + + case *v1history.FlappingHistory: + ev, eventErr = client.buildFlappingHistoryEvent(client.ctx, h) + metaHistory = h.HistoryTableMeta + + case *v1history.StateHistory: + if h.StateType != common.HardState { + return true + } + ev, eventErr = client.buildStateHistoryEvent(client.ctx, h) + metaHistory = h.HistoryTableMeta + + default: + client.logger.Error("Cannot process unsupported type", zap.String("type", fmt.Sprintf("%T", h))) + return true + } + + if eventErr != nil { + client.logger.Errorw("Cannot build event from history entry", + zap.String("type", fmt.Sprintf("%T", entity)), + zap.Error(eventErr)) + return true + } else if ev == nil { + // This really should not happen. + client.logger.Errorw("No event was built, but no error was reported", + zap.String("type", fmt.Sprintf("%T", entity))) + return true + } + + eventLogger := client.logger.With(zap.Object( + "event", + zapcore.ObjectMarshalerFunc(func(encoder zapcore.ObjectEncoder) error { + encoder.AddString("name", ev.Name) + encoder.AddString("type", ev.Type.String()) + return nil + }), + )) + + // The following code accesses Client.rulesInfo. + client.submissionMutex.Lock() + defer client.submissionMutex.Unlock() + + // This loop allows resubmitting an event if the rules have changed. The first try would be the rule update, the + // second try would be the resubmit, and the third try would be for bad luck, e.g., when a second rule update just + // crept in between. If there are three subsequent rule updates, something is wrong. + for try := 0; try < 3; try++ { + eventRuleIds, err := client.evaluateRulesForObject( + client.ctx, + metaHistory.HostId, + metaHistory.ServiceId, + metaHistory.EnvironmentId) + if err != nil { + // While returning false would be more correct, this would result in never being able to refetch new rule + // versions. Consider an invalid object filter expression, which is now impossible to get rid of. + eventLogger.Errorw("Cannot evaluate rules for event, assuming no rule matched", zap.Error(err)) + eventRuleIds = []string{} + } + + ev.RulesVersion = client.rulesInfo.Version + ev.RuleIds = eventRuleIds + + newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev) + if errors.Is(err, source.ErrRulesOutdated) { + eventLogger.Infow("Received a rule update from Icinga Notification, resubmitting event", + zap.String("old_rules_version", client.rulesInfo.Version), + zap.String("new_rules_version", newEventRules.Version)) + + client.rulesInfo = newEventRules + + continue + } else if err != nil { + eventLogger.Errorw("Cannot submit event to Icinga Notifications, will be retried", + zap.String("rules_version", client.rulesInfo.Version), + zap.Any("rules", eventRuleIds), + zap.Error(err)) + return false + } + + eventLogger.Debugw("Successfully submitted event to Icinga Notifications", zap.Any("rules", eventRuleIds)) + return true + } + + eventLogger.Error("Received three rule updates from Icinga Notifications in a row, event will be retried") + return false +} + +var SyncKeyStructPtrs = map[string]any{ + history.SyncPipelineAcknowledgement: (*v1history.AcknowledgementHistory)(nil), + history.SyncPipelineDowntime: (*v1history.DowntimeHistoryMeta)(nil), + history.SyncPipelineFlapping: (*v1history.FlappingHistory)(nil), + history.SyncPipelineState: (*v1history.StateHistory)(nil), +} diff --git a/pkg/notifications/redis_fetch.go b/pkg/notifications/redis_fetch.go new file mode 100644 index 000000000..af9666dd2 --- /dev/null +++ b/pkg/notifications/redis_fetch.go @@ -0,0 +1,180 @@ +package notifications + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/icinga/icinga-go-library/backoff" + "github.com/icinga/icinga-go-library/retry" + "github.com/icinga/icinga-go-library/types" +) + +// redisCustomVar is a customvar entry from Redis. +type redisCustomVar struct { + EnvironmentID types.Binary `json:"environment_id"` + Name string `json:"name"` + Value string `json:"value"` +} + +// redisLookupResult defines the structure of the Redis message we're interested in. +type redisLookupResult struct { + hostName string + serviceName string + customVars []*redisCustomVar +} + +// CustomVars returns a mapping of customvar names to values. +func (result redisLookupResult) CustomVars() map[string]string { + m := make(map[string]string) + for _, customvar := range result.customVars { + m[customvar.Name] = customvar.Value + } + + return m +} + +// fetchHostServiceFromRedis retrieves the host and service names and customvars from Redis. +// +// It uses either the hostId or/and serviceId to fetch the corresponding names. If both are provided, +// the returned result will contain the host name and the service name accordingly. Otherwise, it will +// only contain the host name. +// +// The function has a hard coded timeout of five seconds for all HGET and HGETALL commands together. +func (client *Client) fetchHostServiceFromRedis(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + hgetFromRedis := func(key, id string) (string, error) { + var data string + err := retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { + data, err = client.redisClient.HGet(ctx, key, id).Result() + return + }, + retry.Retryable, + backoff.DefaultBackoff, + retry.Settings{}, + ) + if err != nil { + return "", fmt.Errorf("redis hget %q, %q failed: %w", key, id, err) + } + + return data, nil + } + + getNameFromRedis := func(typ, id string) (string, error) { + data, err := hgetFromRedis("icinga:"+typ, id) + if err != nil { + return "", err + } + + var result struct { + Name string `json:"name"` + } + if err := json.Unmarshal([]byte(data), &result); err != nil { + return "", fmt.Errorf("failed to unmarshal redis result: %w", err) + } + + return result.Name, nil + } + + getCustomVarFromRedis := func(id string) (*redisCustomVar, error) { + data, err := hgetFromRedis("icinga:customvar", id) + if err != nil { + return nil, err + } + + customvar := new(redisCustomVar) + if err := json.Unmarshal([]byte(data), customvar); err != nil { + return nil, fmt.Errorf("failed to unmarshal redis result: %w", err) + } + + return customvar, nil + } + + getObjectCustomVarsFromRedis := func(typ, id string) ([]*redisCustomVar, error) { + var resMap map[string]string + err := retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { + res := client.redisClient.HGetAll(ctx, "icinga:"+typ+":customvar") + if err = res.Err(); err != nil { + return + } + + resMap, err = res.Result() + return + }, + retry.Retryable, + backoff.DefaultBackoff, + retry.Settings{}, + ) + if err != nil { + return nil, fmt.Errorf("failed to HGETALL icinga:%s:customvar from Redis: %w", typ, err) + } + + var result struct { + CustomvarId string `json:"customvar_id"` + HostId string `json:"host_id"` + ServiceId string `json:"service_id"` + } + + var customvars []*redisCustomVar + for _, res := range resMap { + if err := json.Unmarshal([]byte(res), &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal redis result: %w", err) + } + + switch typ { + case "host": + if result.HostId != id { + continue + } + case "service": + if result.ServiceId != id { + continue + } + default: + panic(fmt.Sprintf("unexpected object type %q", typ)) + } + + customvar, err := getCustomVarFromRedis(result.CustomvarId) + if err != nil { + return nil, fmt.Errorf("failed to fetch customvar: %w", err) + } + customvars = append(customvars, customvar) + } + + return customvars, nil + } + + var result redisLookupResult + var err error + + result.hostName, err = getNameFromRedis("host", hostId.String()) + if err != nil { + return nil, err + } + + if serviceId != nil { + result.serviceName, err = getNameFromRedis("service", serviceId.String()) + if err != nil { + return nil, err + } + } + + if serviceId == nil { + result.customVars, err = getObjectCustomVarsFromRedis("host", hostId.String()) + } else { + result.customVars, err = getObjectCustomVarsFromRedis("service", serviceId.String()) + } + if err != nil { + return nil, err + } + + return &result, nil +}