diff --git a/.github/workflows/compliance/check-licenses.sh b/.github/workflows/compliance/check-licenses.sh index 37d35b8f2..63ff76f6a 100755 --- a/.github/workflows/compliance/check-licenses.sh +++ b/.github/workflows/compliance/check-licenses.sh @@ -5,7 +5,7 @@ set -eo pipefail find_license_file() { MOD_NAME="$1" LICENSE_DIR="vendor/$MOD_NAME" - LICENSE_FILES=({,../}LICENSE{,.txt,.md}) + LICENSE_FILES=({,../}{,UN}LICENSE{,.txt,.md}) for LICENSE_FILE in "${LICENSE_FILES[@]}"; do LICENSE_FILE="${LICENSE_DIR}/$LICENSE_FILE" @@ -29,6 +29,8 @@ list_all_deps() { COMPATIBLE_LINE=$(($LINENO + 2)) COMPATIBLE=( + # public domain + 3cee2c43614ad4572d9d594c81b9348cf45ed5ac # vendor/github.com/vbauerster/mpb/v6/UNLICENSE # MIT 66d504eb2f162b9cbf11b07506eeed90c6edabe1 # vendor/github.com/cespare/xxhash/v2/LICENSE.txt 1513ff663e946fdcadb630bed670d253b8b22e1e # vendor/github.com/davecgh/go-spew/spew/../LICENSE diff --git a/cmd/icingadb-migrate/cache.go b/cmd/icingadb-migrate/cache.go new file mode 100644 index 000000000..d1854c9a9 --- /dev/null +++ b/cmd/icingadb-migrate/cache.go @@ -0,0 +1,298 @@ +package main + +import ( + "database/sql" + _ "embed" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "math" + "strings" + "time" +) + +//go:embed embed/event_time_cache_schema.sql +var eventTimeCacheSchema string + +//go:embed embed/previous_hard_state_cache_schema.sql +var previousHardStateCacheSchema string + +// buildEventTimeCache rationale: +// +// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that. +// That would make the IDO reading even slower than the Icinga DB writing. +// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time +// and cache it into an SQLite database. Then steam from that database and the IDO. +// +// Similar for acknowledgements. (On non-recoverable errors the whole program exits.) +func buildEventTimeCache(ht *historyType, idoColumns []string) { + type row = struct { + Id uint64 + EventTime int64 + EventTimeUsec uint32 + EventIsStart uint8 + ObjectId uint64 + } + + chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) { + var checkpoint struct { + Cnt int64 + MaxId sql.NullInt64 + } + cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time") + + ht.bar.SetCurrent(checkpoint.Cnt * 2) + + // Stream source data... + sliceIdoHistory( + ht, + "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+ + // For actual migration icinga_objects will be joined anyway, + // so it makes no sense to take vanished objects into account. + " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ + ht.idoIdColumn+" <= :toid AND xh."+ + ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk", + nil, checkpoint.MaxId.Int64, // ... since we were interrupted: + func(idoRows []row) (checkpoint interface{}) { + for _, idoRow := range idoRows { + if idoRow.EventIsStart == 0 { + // Ack/flapping end event. Get the start event time: + var lst []struct { + EventTime int64 + EventTimeUsec uint32 + } + cacheSelect( + *tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?", + idoRow.ObjectId, + ) + + // If we have that, ... + if len(lst) > 0 { + // ... save the start event time for the actual migration: + cacheExec( + *tx, + "INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)", + idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec, + ) + + // This previously queried info isn't needed anymore. + cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId) + } + } else { + // Ack/flapping start event directly after another start event (per checkable). + // The old one won't have (but the new one will) an end event (which will need its time). + cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId) + + // An ack/flapping start event. The following end event (per checkable) will need its time. + cacheExec( + *tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)", + idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec, + ) + } + + commitPeriodically() + checkpoint = idoRow.Id + } + + ht.bar.IncrBy(len(idoRows)) + return + }, + ) + + // This never queried info isn't needed anymore. + cacheExec(*tx, "DELETE FROM last_start_time") + }) + + ht.bar.SetTotal(ht.bar.Current(), true) +} + +// buildPreviousHardStateCache rationale: +// +// Icinga DB's state_history#previous_hard_state would need a subquery. +// That make the IDO reading even slower than the Icinga DB writing. +// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state +// and cache it into an SQLite database. Then steam from that database and the IDO. +// +// Similar for notifications. (On non-recoverable errors the whole program exits.) +func buildPreviousHardStateCache(ht *historyType, idoColumns []string) { + type row = struct { + Id uint64 + ObjectId uint64 + LastHardState uint8 + } + + chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) { + var nextIds struct { + Cnt int64 + MinId sql.NullInt64 + } + cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids") + + var previousHardStateCnt int64 + cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state") + + var checkpoint int64 + if nextIds.MinId.Valid { // there are next_ids + checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending + } else { // there aren't any next_ids + // next_ids contains the most recently processed IDs and is only empty if... + if previousHardStateCnt == 0 { + // ... we didn't actually start yet... + checkpoint = math.MaxInt64 // start from the largest (possible) ID + } else { + // ... or we've already finished. + checkpoint = 0 // make following query no-op + } + } + + ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt) + + // We continue where we finished before. As we build the cache in reverse chronological order: + // 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds: + // a. Start migration after Icinga DB is up and running. + // b. Remove the cache before the next migration trial. + // 2. If the history gets cleaned up between two migration trials, + // the difference either just doesn't appear in the cache or - if already there - will be ignored later. + + // Stream source data... + sliceIdoHistory( + ht, + "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+ + // For actual migration icinga_objects will be joined anyway, + // so it makes no sense to take vanished objects into account. + " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ + ht.idoIdColumn+" <= :toid AND xh."+ + ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk", + nil, checkpoint, // ... since we were interrupted: + func(idoRows []row) (checkpoint interface{}) { + for _, idoRow := range idoRows { + var nhs []struct{ NextHardState uint8 } + cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId) + + if len(nhs) < 1 { // we just started (per checkable) + // At the moment (we're "travelling back in time") that's the checkable's hard state: + cacheExec( + *tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)", + idoRow.ObjectId, idoRow.LastHardState, + ) + + // But for the current time point the previous hard state isn't known, yet: + cacheExec( + *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)", + idoRow.Id, idoRow.ObjectId, + ) + } else if idoRow.LastHardState == nhs[0].NextHardState { + // The hard state didn't change yet (per checkable), + // so this time point also awaits the previous hard state. + cacheExec( + *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)", + idoRow.Id, idoRow.ObjectId, + ) + } else { // the hard state changed (per checkable) + // That past hard state is now available for the processed future time points: + cacheExec( + *tx, + "INSERT INTO previous_hard_state(history_id, previous_hard_state) "+ + "SELECT history_id, ? FROM next_ids WHERE object_id=?", + idoRow.LastHardState, idoRow.ObjectId, + ) + + // Now they have what they wanted: + cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId) + cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId) + + // That's done. + // Now do the same thing as in the "we just started" case above, for the same reason: + + cacheExec( + *tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)", + idoRow.ObjectId, idoRow.LastHardState, + ) + + cacheExec( + *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)", + idoRow.Id, idoRow.ObjectId, + ) + } + + commitPeriodically() + checkpoint = idoRow.Id + } + + ht.bar.IncrBy(len(idoRows)) + return + }, + ) + + // No past hard state is available for the processed future time points, assuming pending: + cacheExec( + *tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids", + ) + + // Now they should have what they wanted: + cacheExec(*tx, "DELETE FROM next_hard_state") + cacheExec(*tx, "DELETE FROM next_ids") + }) + + ht.bar.SetTotal(ht.bar.Current(), true) +} + +// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically() +// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.) +// (On non-recoverable errors the whole program exits.) +func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) { + logger := log.With("backend", "cache") + + tx, err := cache.Beginx() + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction")) + } + + const commitInterval = 5 * time.Minute + nextCommit := time.Now().Add(commitInterval) + + do(&tx, func() { // commitPeriodically + if now := time.Now(); now.After(nextCommit) { + if err := tx.Commit(); err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction")) + } + + var err error + + tx, err = cache.Beginx() + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction")) + } + + nextCommit = nextCommit.Add(commitInterval) + } + }) + + if err := tx.Commit(); err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction")) + } +} + +// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.) +func cacheGet(cache interface { + Get(dest interface{}, query string, args ...interface{}) error +}, dest interface{}, query string, args ...interface{}) { + if err := cache.Get(dest, query, args...); err != nil { + log.With("backend", "cache", "query", query, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } +} + +// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.) +func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) { + if err := cacheTx.Select(dest, query, args...); err != nil { + log.With("backend", "cache", "query", query, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } +} + +// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits. +func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) { + if _, err := cacheTx.Exec(dml, args...); err != nil { + log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } +} diff --git a/cmd/icingadb-migrate/convert.go b/cmd/icingadb-migrate/convert.go new file mode 100644 index 000000000..8f4c47dfa --- /dev/null +++ b/cmd/icingadb-migrate/convert.go @@ -0,0 +1,843 @@ +package main + +import ( + "database/sql" + _ "embed" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingadb/v1/history" + icingadbTypes "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "strconv" + "strings" + "time" +) + +//go:embed embed/comment_query.sql +var commentMigrationQuery string + +//go:embed embed/downtime_query.sql +var downtimeMigrationQuery string + +//go:embed embed/flapping_query.sql +var flappingMigrationQuery string + +//go:embed embed/notification_query.sql +var notificationMigrationQuery string + +//go:embed embed/state_query.sql +var stateMigrationQuery string + +type commentRow = struct { + CommenthistoryId uint64 + EntryTime int64 + EntryTimeUsec uint32 + EntryType uint8 + AuthorName string + CommentData string + IsPersistent uint8 + ExpirationTime int64 + DeletionTime int64 + DeletionTimeUsec uint32 + Name string + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertCommentRows( + env string, envId, endpointId icingadbTypes.Binary, + _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow, +) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { + var commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck []contracts.Entity + + for _, row := range idoRows { + checkpoint = row.CommenthistoryId + + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + + switch row.EntryType { + case 1: // user + id := calcObjectId(env, row.Name) + entryTime := convertTime(row.EntryTime, row.EntryTimeUsec) + removeTime := convertTime(row.DeletionTime, row.DeletionTimeUsec) + expireTime := convertTime(row.ExpirationTime, 0) + + commentHistory = append(commentHistory, &history.CommentHistory{ + CommentHistoryEntity: history.CommentHistoryEntity{CommentId: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + CommentHistoryUpserter: history.CommentHistoryUpserter{ + RemoveTime: removeTime, + HasBeenRemoved: icingadbTypes.Bool{Bool: !removeTime.Time().IsZero(), Valid: true}, + }, + EntryTime: entryTime, + Author: row.AuthorName, + Comment: row.CommentData, + EntryType: icingadbTypes.CommentType(row.EntryType), + IsPersistent: icingadbTypes.Bool{Bool: row.IsPersistent != 0, Valid: true}, + IsSticky: icingadbTypes.Bool{Bool: false, Valid: true}, + ExpireTime: expireTime, + }) + + h1 := &history.HistoryComment{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "comment_add", row.Name})}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "comment_add", + }, + CommentHistoryId: id, + EntryTime: entryTime, + } + + h1.EventTime.History = h1 + allHistoryComment = append(allHistoryComment, h1) + + if !removeTime.Time().IsZero() { // remove + h2 := &history.HistoryComment{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "comment_remove", row.Name})}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "comment_remove", + }, + CommentHistoryId: id, + EntryTime: entryTime, + RemoveTime: removeTime, + ExpireTime: expireTime, + } + + h2.EventTime.History = h2 + allHistoryComment = append(allHistoryComment, h2) + } + case 4: // ack + name := row.Name1 + if row.Name2 != "" { + name += "!" + row.Name2 + } + + setTime := convertTime(row.EntryTime, row.EntryTimeUsec) + setTs := float64(setTime.Time().UnixMilli()) + clearTime := convertTime(row.DeletionTime, row.DeletionTimeUsec) + acknowledgementHistoryId := hashAny([]any{env, name, setTs}) + + acknowledgementHistory = append(acknowledgementHistory, &history.AcknowledgementHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: acknowledgementHistoryId}, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + AckHistoryUpserter: history.AckHistoryUpserter{ClearTime: clearTime}, + SetTime: setTime, + Author: icingadbTypes.String{ + NullString: sql.NullString{ + String: row.AuthorName, + Valid: true, + }, + }, + Comment: icingadbTypes.String{ + NullString: sql.NullString{ + String: row.CommentData, + Valid: true, + }, + }, + ExpireTime: convertTime(row.ExpirationTime, 0), + IsPersistent: icingadbTypes.Bool{ + Bool: row.IsPersistent != 0, + Valid: true, + }, + }) + + h1 := &history.HistoryAck{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{ + Id: hashAny([]any{env, "ack_set", name, setTs}), + }, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "ack_set", + }, + AcknowledgementHistoryId: acknowledgementHistoryId, + SetTime: setTime, + ClearTime: clearTime, + } + + h1.EventTime.History = h1 + allHistoryAck = append(allHistoryAck, h1) + + if !clearTime.Time().IsZero() { + h2 := &history.HistoryAck{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{ + Id: hashAny([]any{env, "ack_clear", name, setTs}), + }, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "ack_clear", + }, + AcknowledgementHistoryId: acknowledgementHistoryId, + SetTime: setTime, + ClearTime: clearTime, + } + + h2.EventTime.History = h2 + allHistoryAck = append(allHistoryAck, h2) + } + } + } + + icingaDbInserts = [][]contracts.Entity{commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck} + return +} + +type downtimeRow = struct { + DowntimehistoryId uint64 + EntryTime int64 + AuthorName string + CommentData string + IsFixed uint8 + Duration int64 + ScheduledStartTime int64 + ScheduledEndTime int64 + ActualStartTime int64 + ActualStartTimeUsec uint32 + ActualEndTime int64 + ActualEndTimeUsec uint32 + WasCancelled uint8 + TriggerTime int64 + Name string + ObjecttypeId uint8 + Name1 string + Name2 string + TriggeredBy string +} + +func convertDowntimeRows( + env string, envId, endpointId icingadbTypes.Binary, + _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow, +) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { + var downtimeHistory, allHistory, sla []contracts.Entity + + for _, row := range idoRows { + checkpoint = row.DowntimehistoryId + + id := calcObjectId(env, row.Name) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + scheduledStart := convertTime(row.ScheduledStartTime, 0) + scheduledEnd := convertTime(row.ScheduledEndTime, 0) + triggerTime := convertTime(row.TriggerTime, 0) + actualStart := convertTime(row.ActualStartTime, row.ActualStartTimeUsec) + actualEnd := convertTime(row.ActualEndTime, row.ActualEndTimeUsec) + var startTime, endTime, cancelTime icingadbTypes.UnixMilli + + if scheduledEnd.Time().IsZero() { + scheduledEnd = icingadbTypes.UnixMilli(scheduledStart.Time().Add(time.Duration(row.Duration) * time.Second)) + } + + if actualStart.Time().IsZero() { + startTime = scheduledStart + } else { + startTime = actualStart + } + + if actualEnd.Time().IsZero() { + endTime = scheduledEnd + } else { + endTime = actualEnd + } + + if triggerTime.Time().IsZero() { + triggerTime = startTime + } + + if row.WasCancelled != 0 { + cancelTime = actualEnd + } + + downtimeHistory = append(downtimeHistory, &history.DowntimeHistory{ + DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + DowntimeHistoryUpserter: history.DowntimeHistoryUpserter{ + HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true}, + CancelTime: cancelTime, + }, + TriggeredById: calcObjectId(env, row.TriggeredBy), + EntryTime: convertTime(row.EntryTime, 0), + Author: row.AuthorName, + Comment: row.CommentData, + IsFlexible: icingadbTypes.Bool{Bool: row.IsFixed == 0, Valid: true}, + FlexibleDuration: uint64(row.Duration) * 1000, + ScheduledStartTime: scheduledStart, + ScheduledEndTime: scheduledEnd, + StartTime: startTime, + EndTime: endTime, + TriggerTime: triggerTime, + }) + + h1 := &history.HistoryDowntime{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "downtime_start", row.Name})}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "downtime_start", + }, + DowntimeHistoryId: id, + StartTime: startTime, + } + + h1.EventTime.History = h1 + allHistory = append(allHistory, h1) + + if !actualEnd.Time().IsZero() { // remove + h2 := &history.HistoryDowntime{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "downtime_end", row.Name})}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "downtime_end", + }, + DowntimeHistoryId: id, + StartTime: startTime, + CancelTime: cancelTime, + EndTime: endTime, + HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true}, + } + + h2.EventTime.History = h2 + allHistory = append(allHistory, h2) + } + + s := &history.SlaHistoryDowntime{ + DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + DowntimeStart: startTime, + HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true}, + CancelTime: cancelTime, + EndTime: endTime, + } + + s.DowntimeEnd.History = s + sla = append(sla, s) + } + + icingaDbInserts = [][]contracts.Entity{downtimeHistory, allHistory, sla} + return +} + +type flappingRow = struct { + FlappinghistoryId uint64 + EventTime int64 + EventTimeUsec uint32 + EventType uint16 + PercentStateChange sql.NullFloat64 + LowThreshold float64 + HighThreshold float64 + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertFlappingRows( + env string, envId, endpointId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow, +) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + EventTime int64 + EventTimeUsec uint32 + } + selectCache( + &cached, "SELECT history_id, event_time, event_time_usec FROM end_start_time WHERE history_id BETWEEN ? AND ?", + idoRows[0].FlappinghistoryId, idoRows[len(idoRows)-1].FlappinghistoryId, + ) + + // Needed for start time (see below). + cachedById := make(map[uint64]icingadbTypes.UnixMilli, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = convertTime(c.EventTime, c.EventTimeUsec) + } + + var flappingHistory, flappingHistoryUpserts, allHistory []contracts.Entity + for _, row := range idoRows { + checkpoint = row.FlappinghistoryId + + ts := convertTime(row.EventTime, row.EventTimeUsec) + + // Needed for ID (see below). + var start icingadbTypes.UnixMilli + if row.EventType == 1001 { // end + var ok bool + start, ok = cachedById[row.FlappinghistoryId] + + if !ok { + continue + } + } else { + start = ts + } + + name := row.Name1 + if row.Name2 != "" { + name += "!" + row.Name2 + } + + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + startTime := float64(start.Time().UnixMilli()) + flappingHistoryId := hashAny([]interface{}{env, name, startTime}) + + if row.EventType == 1001 { // end + // The start counterpart should already have been inserted. + flappingHistoryUpserts = append(flappingHistoryUpserts, &history.FlappingHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: flappingHistoryId}, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + FlappingHistoryUpserter: history.FlappingHistoryUpserter{ + EndTime: ts, + PercentStateChangeEnd: icingadbTypes.Float{NullFloat64: row.PercentStateChange}, + FlappingThresholdLow: float32(row.LowThreshold), + FlappingThresholdHigh: float32(row.HighThreshold), + }, + StartTime: start, + }) + + h := &history.HistoryFlapping{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{ + Id: hashAny([]interface{}{env, "flapping_end", name, startTime}), + }, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "flapping_end", + }, + FlappingHistoryId: flappingHistoryId, + StartTime: start, + EndTime: ts, + } + + h.EventTime.History = h + allHistory = append(allHistory, h) + } else { + flappingHistory = append(flappingHistory, &history.FlappingHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: flappingHistoryId}, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + FlappingHistoryUpserter: history.FlappingHistoryUpserter{ + FlappingThresholdLow: float32(row.LowThreshold), + FlappingThresholdHigh: float32(row.HighThreshold), + }, + StartTime: start, + PercentStateChangeStart: icingadbTypes.Float{NullFloat64: row.PercentStateChange}, + }) + + h := &history.HistoryFlapping{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{ + Id: hashAny([]interface{}{env, "flapping_start", name, startTime}), + }, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "flapping_start", + }, + FlappingHistoryId: flappingHistoryId, + StartTime: start, + } + + h.EventTime.History = h + allHistory = append(allHistory, h) + } + } + + icingaDbInserts = [][]contracts.Entity{flappingHistory, allHistory} + icingaDbUpserts = [][]contracts.Entity{flappingHistoryUpserts} + return +} + +type notificationRow = struct { + NotificationId uint64 + NotificationReason uint8 + EndTime int64 + EndTimeUsec uint32 + State uint8 + Output string + LongOutput sql.NullString + ContactsNotified uint16 + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertNotificationRows( + env string, envId, endpointId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow, +) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + PreviousHardState uint8 + } + selectCache( + &cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?", + idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId, + ) + + cachedById := make(map[uint64]uint8, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = c.PreviousHardState + } + + var contacts []struct { + NotificationId uint64 + Name1 string + } + + { + var query = ido.Rebind( + "SELECT c.notification_id, o.name1 FROM icinga_contactnotifications c " + + "INNER JOIN icinga_objects o ON o.object_id=c.contact_object_id WHERE c.notification_id BETWEEN ? AND ?", + ) + + err := ido.Select(&contacts, query, idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId) + if err != nil { + log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + } + + contactsById := map[uint64]map[string]struct{}{} + for _, contact := range contacts { + perId, ok := contactsById[contact.NotificationId] + if !ok { + perId = map[string]struct{}{} + contactsById[contact.NotificationId] = perId + } + + perId[contact.Name1] = struct{}{} + } + + var notificationHistory, userNotificationHistory, allHistory []contracts.Entity + for _, row := range idoRows { + checkpoint = row.NotificationId + + previousHardState, ok := cachedById[row.NotificationId] + if !ok { + continue + } + + // The IDO tracks only sent notifications, but not notification config objects, nor even their names. + // We have to improvise. By the way we avoid unwanted collisions between synced and migrated data via "ID" + // instead of "HOST[!SERVICE]!NOTIFICATION" (ok as this name won't be parsed, but only hashed) and between + // migrated data itself via the history ID as object name, i.e. one "virtual object" per sent notification. + name := strconv.FormatUint(row.NotificationId, 10) + + nt := convertNotificationType(row.NotificationReason, row.State) + + ntEnum, err := nt.Value() + if err != nil { + continue + } + + ts := convertTime(row.EndTime, row.EndTimeUsec) + tsMilli := float64(ts.Time().UnixMilli()) + notificationHistoryId := hashAny([]interface{}{env, name, ntEnum, tsMilli}) + id := hashAny([]interface{}{env, "notification", name, ntEnum, tsMilli}) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + + text := row.Output + if row.LongOutput.Valid { + text += "\n\n" + row.LongOutput.String + } + + notificationHistory = append(notificationHistory, &history.NotificationHistory{ + HistoryTableEntity: history.HistoryTableEntity{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: notificationHistoryId}, + }, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + NotificationId: calcObjectId(env, name), + Type: nt, + SendTime: ts, + State: row.State, + PreviousHardState: previousHardState, + Text: icingadbTypes.String{ + NullString: sql.NullString{ + String: text, + Valid: true, + }, + }, + UsersNotified: row.ContactsNotified, + }) + + allHistory = append(allHistory, &history.HistoryNotification{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "notification", + }, + NotificationHistoryId: notificationHistoryId, + EventTime: ts, + }) + + for contact := range contactsById[row.NotificationId] { + userId := calcObjectId(env, contact) + + userNotificationHistory = append(userNotificationHistory, &history.UserNotificationHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{ + Id: utils.Checksum(append(append([]byte(nil), notificationHistoryId...), userId...)), + }, + }, + EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: envId}, + NotificationHistoryId: notificationHistoryId, + UserId: userId, + }) + } + } + + icingaDbInserts = [][]contracts.Entity{notificationHistory, userNotificationHistory, allHistory} + return +} + +// convertNotificationType maps IDO values[1] to Icinga DB ones[2]. +// +// [1]: https://github.com/Icinga/icinga2/blob/32c7f7730db154ba0dff5856a8985d125791c/lib/db_ido/dbevents.cpp#L1507-L1524 +// [2]: https://github.com/Icinga/icingadb/blob/8f31ac143875498797725adb9bfacf3d4/pkg/types/notification_type.go#L53-L61 +func convertNotificationType(notificationReason, state uint8) icingadbTypes.NotificationType { + switch notificationReason { + case 0: // state + if state == 0 { + return 64 // recovery + } else { + return 32 // problem + } + case 1: // acknowledgement + return 16 + case 2: // flapping start + return 128 + case 3: // flapping end + return 256 + case 5: // downtime start + return 1 + case 6: // downtime end + return 2 + case 7: // downtime removed + return 4 + case 8: // custom + return 8 + default: // bad notification type + return 0 + } +} + +type stateRow = struct { + StatehistoryId uint64 + StateTime int64 + StateTimeUsec uint32 + State uint8 + StateType uint8 + CurrentCheckAttempt uint16 + MaxCheckAttempts uint16 + LastState uint8 + LastHardState uint8 + Output sql.NullString + LongOutput sql.NullString + CheckSource sql.NullString + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertStateRows( + env string, envId, endpointId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow, +) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + PreviousHardState uint8 + } + selectCache( + &cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?", + idoRows[0].StatehistoryId, idoRows[len(idoRows)-1].StatehistoryId, + ) + + cachedById := make(map[uint64]uint8, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = c.PreviousHardState + } + + var stateHistory, allHistory, sla []contracts.Entity + for _, row := range idoRows { + checkpoint = row.StatehistoryId + + previousHardState, ok := cachedById[row.StatehistoryId] + if !ok { + continue + } + + name := strings.Join([]string{row.Name1, row.Name2}, "!") + ts := convertTime(row.StateTime, row.StateTimeUsec) + tsMilli := float64(ts.Time().UnixMilli()) + stateHistoryId := hashAny([]interface{}{env, name, tsMilli}) + id := hashAny([]interface{}{env, "state_change", name, tsMilli}) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + + stateHistory = append(stateHistory, &history.StateHistory{ + HistoryTableEntity: history.HistoryTableEntity{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: stateHistoryId}, + }, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + EventTime: ts, + StateType: icingadbTypes.StateType(row.StateType), + SoftState: row.State, + HardState: row.LastHardState, + PreviousSoftState: row.LastState, + PreviousHardState: previousHardState, + CheckAttempt: uint8(row.CurrentCheckAttempt), + Output: icingadbTypes.String{NullString: row.Output}, + LongOutput: icingadbTypes.String{NullString: row.LongOutput}, + MaxCheckAttempts: uint32(row.MaxCheckAttempts), + CheckSource: icingadbTypes.String{NullString: row.CheckSource}, + }) + + allHistory = append(allHistory, &history.HistoryState{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "state_change", + }, + StateHistoryId: stateHistoryId, + EventTime: ts, + }) + + if icingadbTypes.StateType(row.StateType) == icingadbTypes.StateHard { + // only hard state changes are relevant for SLA history, discard all others + + sla = append(sla, &history.SlaHistoryState{ + HistoryTableEntity: history.HistoryTableEntity{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: stateHistoryId}, + }, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + EventTime: ts, + StateType: icingadbTypes.StateType(row.StateType), + HardState: row.LastHardState, + PreviousHardState: previousHardState, + }) + } + } + + icingaDbInserts = [][]contracts.Entity{stateHistory, allHistory, sla} + return +} diff --git a/cmd/icingadb-migrate/embed/comment_query.sql b/cmd/icingadb-migrate/embed/comment_query.sql new file mode 100644 index 000000000..774ccf970 --- /dev/null +++ b/cmd/icingadb-migrate/embed/comment_query.sql @@ -0,0 +1,11 @@ +SELECT ch.commenthistory_id, UNIX_TIMESTAMP(ch.entry_time) entry_time, + ch.entry_time_usec, ch.entry_type, ch.author_name, ch.comment_data, ch.is_persistent, + COALESCE(UNIX_TIMESTAMP(ch.expiration_time), 0) expiration_time, + COALESCE(UNIX_TIMESTAMP(ch.deletion_time), 0) deletion_time, + ch.deletion_time_usec, ch.name, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2 +FROM icinga_commenthistory ch USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=ch.object_id +WHERE ch.commenthistory_id BETWEEN :fromid AND :toid +AND ch.commenthistory_id > :checkpoint -- where we were interrupted +ORDER BY ch.commenthistory_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/embed/downtime_query.sql b/cmd/icingadb-migrate/embed/downtime_query.sql new file mode 100644 index 000000000..e3d36bf02 --- /dev/null +++ b/cmd/icingadb-migrate/embed/downtime_query.sql @@ -0,0 +1,14 @@ +SELECT dh.downtimehistory_id, UNIX_TIMESTAMP(dh.entry_time) entry_time, dh.author_name, dh.comment_data, + dh.is_fixed, dh.duration, UNIX_TIMESTAMP(dh.scheduled_start_time) scheduled_start_time, + COALESCE(UNIX_TIMESTAMP(dh.scheduled_end_time), 0) scheduled_end_time, + COALESCE(UNIX_TIMESTAMP(dh.actual_start_time), 0) actual_start_time, dh.actual_start_time_usec, + COALESCE(UNIX_TIMESTAMP(dh.actual_end_time), 0) actual_end_time, dh.actual_end_time_usec, dh.was_cancelled, + COALESCE(UNIX_TIMESTAMP(dh.trigger_time), 0) trigger_time, dh.name, o.objecttype_id, + o.name1, COALESCE(o.name2, '') name2, COALESCE(sd.name, '') triggered_by +FROM icinga_downtimehistory dh USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=dh.object_id +LEFT JOIN icinga_scheduleddowntime sd ON sd.scheduleddowntime_id=dh.triggered_by_id +WHERE dh.downtimehistory_id BETWEEN :fromid AND :toid +AND dh.downtimehistory_id > :checkpoint -- where we were interrupted +ORDER BY dh.downtimehistory_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/embed/event_time_cache_schema.sql b/cmd/icingadb-migrate/embed/event_time_cache_schema.sql new file mode 100644 index 000000000..5940754be --- /dev/null +++ b/cmd/icingadb-migrate/embed/event_time_cache_schema.sql @@ -0,0 +1,15 @@ +PRAGMA main.auto_vacuum = 1; + +-- Icinga DB's flapping_history#start_time per flapping_end row (IDO's icinga_flappinghistory#flappinghistory_id). +CREATE TABLE IF NOT EXISTS end_start_time ( + history_id INT PRIMARY KEY, + event_time INT NOT NULL, + event_time_usec INT NOT NULL +); + +-- Helper table, the last start_time per icinga_statehistory#object_id. +CREATE TABLE IF NOT EXISTS last_start_time ( + object_id INT PRIMARY KEY, + event_time INT NOT NULL, + event_time_usec INT NOT NULL +); diff --git a/cmd/icingadb-migrate/embed/flapping_query.sql b/cmd/icingadb-migrate/embed/flapping_query.sql new file mode 100644 index 000000000..5e25bded7 --- /dev/null +++ b/cmd/icingadb-migrate/embed/flapping_query.sql @@ -0,0 +1,9 @@ +SELECT fh.flappinghistory_id, UNIX_TIMESTAMP(fh.event_time) event_time, + fh.event_time_usec, fh.event_type, fh.percent_state_change, fh.low_threshold, + fh.high_threshold, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2 +FROM icinga_flappinghistory fh USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=fh.object_id +WHERE fh.flappinghistory_id BETWEEN :fromid AND :toid +AND fh.flappinghistory_id > :checkpoint -- where we were interrupted +ORDER BY fh.flappinghistory_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql b/cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql new file mode 100644 index 000000000..54c1c0055 --- /dev/null +++ b/cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS ido_migration_progress ( + environment_id CHAR(40) NOT NULL, -- Hex SHA1. Rationale: CHAR(40) is not RDBMS-specific + history_type VARCHAR(63) NOT NULL, + from_ts BIGINT NOT NULL, + to_ts BIGINT NOT NULL, + last_ido_id BIGINT NOT NULL, + + CONSTRAINT pk_ido_migration_progress PRIMARY KEY (environment_id, history_type, from_ts, to_ts) +); diff --git a/cmd/icingadb-migrate/embed/notification_query.sql b/cmd/icingadb-migrate/embed/notification_query.sql new file mode 100644 index 000000000..f963b2e9a --- /dev/null +++ b/cmd/icingadb-migrate/embed/notification_query.sql @@ -0,0 +1,9 @@ +SELECT n.notification_id, n.notification_reason, UNIX_TIMESTAMP(n.end_time) end_time, + n.end_time_usec, n.state, COALESCE(n.output, '') output, n.long_output, + n.contacts_notified, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2 +FROM icinga_notifications n USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=n.object_id +WHERE n.notification_id BETWEEN :fromid AND :toid +AND n.notification_id <= :cache_limit AND n.notification_id > :checkpoint -- where we were interrupted +ORDER BY n.notification_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql b/cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql new file mode 100644 index 000000000..315f22dc9 --- /dev/null +++ b/cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql @@ -0,0 +1,22 @@ +PRAGMA main.auto_vacuum = 1; + +-- Icinga DB's state_history#previous_hard_state per IDO's icinga_statehistory#statehistory_id. +CREATE TABLE IF NOT EXISTS previous_hard_state ( + history_id INT PRIMARY KEY, + previous_hard_state INT NOT NULL +); + +-- Helper table, the current last_hard_state per icinga_statehistory#object_id. +CREATE TABLE IF NOT EXISTS next_hard_state ( + object_id INT PRIMARY KEY, + next_hard_state INT NOT NULL +); + +-- Helper table for stashing icinga_statehistory#statehistory_id until last_hard_state changes. +CREATE TABLE IF NOT EXISTS next_ids ( + object_id INT NOT NULL, + history_id INT NOT NULL +); + +CREATE INDEX IF NOT EXISTS next_ids_object_id ON next_ids (object_id); +CREATE INDEX IF NOT EXISTS next_ids_history_id ON next_ids (history_id); diff --git a/cmd/icingadb-migrate/embed/state_query.sql b/cmd/icingadb-migrate/embed/state_query.sql new file mode 100644 index 000000000..3e95d4824 --- /dev/null +++ b/cmd/icingadb-migrate/embed/state_query.sql @@ -0,0 +1,9 @@ +SELECT sh.statehistory_id, UNIX_TIMESTAMP(sh.state_time) state_time, sh.state_time_usec, sh.state, + sh.state_type, sh.current_check_attempt, sh.max_check_attempts, sh.last_state, sh.last_hard_state, + sh.output, sh.long_output, sh.check_source, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2 +FROM icinga_statehistory sh USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=sh.object_id +WHERE sh.statehistory_id BETWEEN :fromid AND :toid +AND sh.statehistory_id <= :cache_limit AND sh.statehistory_id > :checkpoint -- where we were interrupted +ORDER BY sh.statehistory_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/main.go b/cmd/icingadb-migrate/main.go new file mode 100644 index 000000000..af890fb36 --- /dev/null +++ b/cmd/icingadb-migrate/main.go @@ -0,0 +1,500 @@ +package main + +import ( + "context" + "crypto/sha1" + "database/sql" + _ "embed" + "encoding/hex" + "fmt" + "github.com/creasty/defaults" + "github.com/goccy/go-yaml" + "github.com/icinga/icingadb/pkg/config" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/logging" + icingadbTypes "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/jessevdk/go-flags" + "github.com/jmoiron/sqlx" + "github.com/jmoiron/sqlx/reflectx" + _ "github.com/mattn/go-sqlite3" + "github.com/pkg/errors" + "github.com/vbauerster/mpb/v6" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "math" + "os" + "path" + "path/filepath" + "regexp" + "strings" + "time" +) + +// Flags defines the CLI flags. +type Flags struct { + // Config is the path to the config file. + Config string `short:"c" long:"config" description:"path to config file" required:"true"` + // Cache is a (not necessarily yet existing) directory for caching. + Cache string `short:"t" long:"cache" description:"path for caching" required:"true"` +} + +// Config defines the YAML config structure. +type Config struct { + IDO struct { + config.Database `yaml:"-,inline"` + From int32 `yaml:"from"` + To int32 `yaml:"to" default:"2147483647"` + } `yaml:"ido"` + IcingaDB config.Database `yaml:"icingadb"` + // Icinga2 specifies information the IDO doesn't provide. + Icinga2 struct { + // Env specifies the environment ID, hex. + Env string `yaml:"env"` + // Endpoint specifies the name on the main endpoint writing to IDO. + Endpoint string `yaml:"endpoint"` + } `yaml:"icinga2"` +} + +// main validates the CLI, parses the config and migrates history from IDO to Icinga DB (see comments below). +// Most of the called functions exit the whole program by themselves on non-recoverable errors. +func main() { + f := &Flags{} + if _, err := flags.NewParser(f, flags.Default).Parse(); err != nil { + os.Exit(2) + } + + c, ex := parseConfig(f) + if c == nil { + os.Exit(ex) + } + + envId, err := hex.DecodeString(c.Icinga2.Env) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "bad env ID: %s\n", err.Error()) + os.Exit(2) + } + + defer func() { _ = log.Sync() }() + + log.Info("Starting IDO to Icinga DB history migration") + + ido, idb := connectAll(c) + + if err := idb.CheckSchema(context.Background()); err != nil { + log.Fatalf("%+v", err) + } + + // Start repeatable-read-isolated transactions (consistent SELECTs) + // not to have to care for IDO data changes during migration. + startIdoTx(ido) + + // Prepare the directory structure the following fillCache() will need later. + mkCache(f, c, idb.Mapper) + + log.Info("Computing progress") + + // Convert Config#IDO.From and .To to IDs to restrict data by PK. + computeIdRange(c) + + // computeProgress figures out which data has already been migrated + // not to start from the beginning every time in the following migrate(). + computeProgress(c, idb, envId) + + // On rationale read buildEventTimeCache() and buildPreviousHardStateCache() docs. + log.Info("Filling cache") + fillCache() + + log.Info("Actually migrating") + migrate(c, idb, envId) + + log.Info("Cleaning up cache") + cleanupCache(f) +} + +// parseConfig validates the f.Config file and returns the config and -1 or - on failure - nil and an exit code. +func parseConfig(f *Flags) (_ *Config, exit int) { + cf, err := os.Open(f.Config) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "can't open config file: %s\n", err.Error()) + return nil, 2 + } + defer func() { _ = cf.Close() }() + + c := &Config{} + if err := defaults.Set(c); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "can't set config defaults: %s\n", err.Error()) + return nil, 2 + } + + if err := yaml.NewDecoder(cf).Decode(c); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "can't parse config file: %s\n", err.Error()) + return nil, 2 + } + + return c, -1 +} + +var nonWords = regexp.MustCompile(`\W+`) + +// mkCache ensures /.sqlite3 files are present and contain their schema +// and initializes types[*].cache. (On non-recoverable errors the whole program exits.) +func mkCache(f *Flags, c *Config, mapper *reflectx.Mapper) { + log.Info("Preparing cache") + + if err := os.MkdirAll(f.Cache, 0700); err != nil { + log.With("dir", f.Cache).Fatalf("%+v", errors.Wrap(err, "can't create directory")) + } + + types.forEach(func(ht *historyType) { + if ht.cacheSchema == "" { + return + } + + file := path.Join(f.Cache, fmt.Sprintf( + "%s_%d-%d.sqlite3", nonWords.ReplaceAllLiteralString(ht.name, "_"), c.IDO.From, c.IDO.To, + )) + + var err error + + ht.cache, err = sqlx.Open("sqlite3", "file:"+file) + if err != nil { + log.With("file", file).Fatalf("%+v", errors.Wrap(err, "can't open SQLite database")) + } + + ht.cacheFile = file + ht.cache.Mapper = mapper + + if _, err := ht.cache.Exec(ht.cacheSchema); err != nil { + log.With("file", file, "ddl", ht.cacheSchema). + Fatalf("%+v", errors.Wrap(err, "can't import schema into SQLite database")) + } + }) +} + +// connectAll connects to ido and idb (Icinga DB) as c specifies. (On non-recoverable errors the whole program exits.) +func connectAll(c *Config) (ido, idb *icingadb.DB) { + log.Info("Connecting to databases") + eg, _ := errgroup.WithContext(context.Background()) + + eg.Go(func() error { + ido = connect("IDO", &c.IDO.Database) + return nil + }) + + eg.Go(func() error { + idb = connect("Icinga DB", &c.IcingaDB) + return nil + }) + + _ = eg.Wait() + return +} + +// connect connects to which DB as cfg specifies. (On non-recoverable errors the whole program exits.) +func connect(which string, cfg *config.Database) *icingadb.DB { + db, err := cfg.Open(logging.NewLogger(zap.NewNop().Sugar(), 20*time.Second)) + if err != nil { + log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database")) + } + + if err := db.Ping(); err != nil { + log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database")) + } + + return db +} + +// startIdoTx initializes types[*].snapshot with new repeatable-read-isolated ido transactions. +// (On non-recoverable errors the whole program exits.) +func startIdoTx(ido *icingadb.DB) { + types.forEach(func(ht *historyType) { + tx, err := ido.BeginTxx(context.Background(), &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) + if err != nil { + log.Fatalf("%+v", errors.Wrap(err, "can't begin snapshot transaction")) + } + + ht.snapshot = tx + }) +} + +// computeIdRange initializes types[*].fromId and types[*].toId. +// (On non-recoverable errors the whole program exits.) +func computeIdRange(c *Config) { + types.forEach(func(ht *historyType) { + getBorderId := func(id *uint64, timeColumns []string, compOperator string, borderTime int32, sortOrder string) { + deZeroFied := make([]string, 0, len(timeColumns)) + for _, column := range timeColumns { + deZeroFied = append(deZeroFied, fmt.Sprintf( + "CASE WHEN %[1]s < '1970-01-03 00:00:00' THEN NULL ELSE %[1]s END", column, + )) + } + + var timeExpr string + if len(deZeroFied) > 1 { + timeExpr = "COALESCE(" + strings.Join(deZeroFied, ",") + ")" + } else { + timeExpr = deZeroFied[0] + } + + query := ht.snapshot.Rebind( + "SELECT " + ht.idoIdColumn + " FROM " + ht.idoTable + " WHERE " + timeExpr + " " + compOperator + + " FROM_UNIXTIME(?) ORDER BY " + ht.idoIdColumn + " " + sortOrder + " LIMIT 1", + ) + + switch err := ht.snapshot.Get(id, query, borderTime); err { + case nil, sql.ErrNoRows: + default: + log.With("backend", "IDO", "query", query, "args", []any{borderTime}). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + } + + ht.fromId = math.MaxInt64 + + getBorderId(&ht.fromId, ht.idoEndColumns, ">=", c.IDO.From, "ASC") + getBorderId(&ht.toId, ht.idoStartColumns, "<=", c.IDO.To, "DESC") + }) +} + +//go:embed embed/ido_migration_progress_schema.sql +var idoMigrationProgressSchema string + +// computeProgress initializes types[*].lastId, types[*].total and types[*].done. +// (On non-recoverable errors the whole program exits.) +func computeProgress(c *Config, idb *icingadb.DB, envId []byte) { + if _, err := idb.Exec(idoMigrationProgressSchema); err != nil { + log.Fatalf("%+v", errors.Wrap(err, "can't create table ido_migration_progress")) + } + + envIdHex := hex.EncodeToString(envId) + types.forEach(func(ht *historyType) { + var query = idb.Rebind( + "SELECT last_ido_id FROM ido_migration_progress" + + " WHERE environment_id=? AND history_type=? AND from_ts=? AND to_ts=?", + ) + + args := []any{envIdHex, ht.name, c.IDO.From, c.IDO.To} + + if err := idb.Get(&ht.lastId, query, args...); err != nil && err != sql.ErrNoRows { + log.With("backend", "Icinga DB", "query", query, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + }) + + types.forEach(func(ht *historyType) { + if ht.cacheFiller != nil { + err := ht.snapshot.Get( + &ht.cacheTotal, + ht.snapshot.Rebind( + // For actual migration icinga_objects will be joined anyway, + // so it makes no sense to take vanished objects into account. + "SELECT COUNT(*) FROM "+ht.idoTable+ + " xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ht.idoIdColumn+" <= ?", + ), + ht.toId, + ) + if err != nil { + log.Fatalf("%+v", errors.Wrap(err, "can't count query")) + } + } + }) + + types.forEach(func(ht *historyType) { + var rows []struct { + Migrated uint8 + Cnt int64 + } + + err := ht.snapshot.Select( + &rows, + ht.snapshot.Rebind( + // For actual migration icinga_objects will be joined anyway, + // so it makes no sense to take vanished objects into account. + "SELECT CASE WHEN xh."+ht.idoIdColumn+"<=? THEN 1 ELSE 0 END migrated, COUNT(*) cnt FROM "+ + ht.idoTable+" xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ + ht.idoIdColumn+" BETWEEN ? AND ? GROUP BY migrated", + ), + ht.lastId, ht.fromId, ht.toId, + ) + if err != nil { + log.Fatalf("%+v", errors.Wrap(err, "can't count query")) + } + + for _, row := range rows { + ht.total += row.Cnt + + if row.Migrated == 1 { + ht.done = row.Cnt + } + } + + log.Infow("Counted migrated IDO events", "type", ht.name, "migrated", ht.done, "total", ht.total) + }) +} + +// fillCache fills /.sqlite3 (actually types[*].cacheFiller does). +func fillCache() { + progress := mpb.New() + for _, ht := range types { + if ht.cacheFiller != nil { + ht.setupBar(progress, ht.cacheTotal) + } + } + + types.forEach(func(ht *historyType) { + if ht.cacheFiller != nil { + ht.cacheFiller(ht) + } + }) + + progress.Wait() +} + +// migrate does the actual migration. +func migrate(c *Config, idb *icingadb.DB, envId []byte) { + endpointId := sha1.Sum([]byte(c.Icinga2.Endpoint)) + + progress := mpb.New() + for _, ht := range types { + ht.setupBar(progress, ht.total) + } + + types.forEach(func(ht *historyType) { + ht.migrate(c, idb, envId, endpointId, ht) + }) + + progress.Wait() +} + +// migrate does the actual migration for one history type. +func migrateOneType[IdoRow any]( + c *Config, idb *icingadb.DB, envId []byte, endpointId [sha1.Size]byte, ht *historyType, + convertRows func(env string, envId, endpointId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, + idoRows []IdoRow) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any), +) { + var lastQuery string + var lastStmt *sqlx.Stmt + + defer func() { + if lastStmt != nil { + _ = lastStmt.Close() + } + }() + + selectCache := func(dest interface{}, query string, args ...interface{}) { + // Prepare new one, if old one doesn't fit anymore. + if query != lastQuery { + if lastStmt != nil { + _ = lastStmt.Close() + } + + var err error + + lastStmt, err = ht.cache.Preparex(query) + if err != nil { + log.With("backend", "cache", "query", query). + Fatalf("%+v", errors.Wrap(err, "can't prepare query")) + } + + lastQuery = query + } + + if err := lastStmt.Select(dest, args...); err != nil { + log.With("backend", "cache", "query", query, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + } + + var args map[string]interface{} + + // For the case that the cache was older that the IDO, + // but ht.cacheFiller couldn't update it, limit (WHERE) our source data set. + if ht.cacheLimitQuery != "" { + var limit sql.NullInt64 + cacheGet(ht.cache, &limit, ht.cacheLimitQuery) + args = map[string]interface{}{"cache_limit": limit.Int64} + } + + upsertProgress, _ := idb.BuildUpsertStmt(&IdoMigrationProgress{}) + envIdHex := hex.EncodeToString(envId) + + ht.bar.SetCurrent(ht.done) + + // Stream IDO rows, ... + sliceIdoHistory( + ht, ht.migrationQuery, args, ht.lastId, + func(idoRows []IdoRow) (checkpoint interface{}) { + // ... convert them, ... + inserts, upserts, lastIdoId := convertRows( + c.Icinga2.Env, envId, endpointId[:], selectCache, ht.snapshot, idoRows, + ) + + // ... and insert them: + + for _, op := range []struct { + kind string + data [][]contracts.Entity + streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error + }{{"INSERT IGNORE", inserts, idb.CreateIgnoreStreamed}, {"UPSERT", upserts, idb.UpsertStreamed}} { + for _, table := range op.data { + if len(table) < 1 { + continue + } + + ch := make(chan contracts.Entity, len(table)) + for _, row := range table { + ch <- row + } + + close(ch) + + if err := op.streamer(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + } + + if lastIdoId != nil { + args := map[string]interface{}{"history_type": ht.name, "last_ido_id": lastIdoId} + + _, err := idb.NamedExec(upsertProgress, &IdoMigrationProgress{ + IdoMigrationProgressUpserter{lastIdoId}, envIdHex, ht.name, c.IDO.From, c.IDO.To, + }) + if err != nil { + log.With("backend", "Icinga DB", "dml", upsertProgress, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + + ht.bar.IncrBy(len(idoRows)) + return lastIdoId + }, + ) + + ht.bar.SetTotal(ht.bar.Current(), true) +} + +// cleanupCache removes /.sqlite3 files. +func cleanupCache(f *Flags) { + types.forEach(func(ht *historyType) { + if ht.cacheFile != "" { + if err := ht.cache.Close(); err != nil { + log.With("file", ht.cacheFile).Warnf("%+v", errors.Wrap(err, "can't close SQLite database")) + } + } + }) + + if matches, err := filepath.Glob(path.Join(f.Cache, "*.sqlite3")); err == nil { + for _, match := range matches { + if err := os.Remove(match); err != nil { + log.With("file", match).Warnf("%+v", errors.Wrap(err, "can't remove SQLite database")) + } + } + } else { + log.With("dir", f.Cache).Warnf("%+v", errors.Wrap(err, "can't list SQLite databases")) + } +} diff --git a/cmd/icingadb-migrate/misc.go b/cmd/icingadb-migrate/misc.go new file mode 100644 index 000000000..3163dd8fa --- /dev/null +++ b/cmd/icingadb-migrate/misc.go @@ -0,0 +1,317 @@ +package main + +import ( + "context" + "crypto/sha1" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/driver" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/icingadb/objectpacker" + icingadbTypes "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "github.com/vbauerster/mpb/v6" + "github.com/vbauerster/mpb/v6/decor" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "strings" + "time" +) + +type IdoMigrationProgressUpserter struct { + LastIdoId any `json:"last_ido_id"` +} + +// Upsert implements the contracts.Upserter interface. +func (impu *IdoMigrationProgressUpserter) Upsert() interface{} { + return impu +} + +type IdoMigrationProgress struct { + IdoMigrationProgressUpserter `json:",inline"` + EnvironmentId string `json:"environment_id"` + HistoryType string `json:"history_type"` + FromTs int32 `json:"from_ts"` + ToTs int32 `json:"to_ts"` +} + +// Assert interface compliance. +var ( + _ contracts.Upserter = (*IdoMigrationProgressUpserter)(nil) + _ contracts.Upserter = (*IdoMigrationProgress)(nil) +) + +// log is the root logger. +var log = func() *zap.SugaredLogger { + logger, err := zap.NewDevelopmentConfig().Build() + if err != nil { + panic(err) + } + + return logger.Sugar() +}() + +// objectTypes maps IDO values to Icinga DB ones. +var objectTypes = map[uint8]string{1: "host", 2: "service"} + +// hashAny combines objectpacker.PackAny and SHA1 hashing. +func hashAny(in interface{}) []byte { + hash := sha1.New() + if err := objectpacker.PackAny(in, hash); err != nil { + panic(err) + } + + return hash.Sum(nil) +} + +// convertTime converts *nix timestamps from the IDO for Icinga DB. +func convertTime(ts int64, tsUs uint32) icingadbTypes.UnixMilli { + if ts == 0 && tsUs == 0 { + return icingadbTypes.UnixMilli{} + } + + return icingadbTypes.UnixMilli(time.Unix(ts, int64(tsUs)*int64(time.Microsecond/time.Nanosecond))) +} + +// calcObjectId calculates the ID of the config object named name1 for Icinga DB. +func calcObjectId(env, name1 string) []byte { + if name1 == "" { + return nil + } + + return hashAny([2]string{env, name1}) +} + +// calcServiceId calculates the ID of the service name2 of the host name1 for Icinga DB. +func calcServiceId(env, name1, name2 string) []byte { + if name2 == "" { + return nil + } + + return hashAny([2]string{env, name1 + "!" + name2}) +} + +// sliceIdoHistory performs query with args+fromid,toid,checkpoint,bulk on ht.snapshot +// and passes the results to onRows until either an empty result set or onRows() returns nil. +// Rationale: split the likely large result set of a query by adding a WHERE condition and a LIMIT, +// both with :named placeholders (:checkpoint, :bulk). +// checkpoint is the initial value for the WHERE condition, onRows() returns follow-up ones. +// (On non-recoverable errors the whole program exits.) +func sliceIdoHistory[Row any]( + ht *historyType, query string, args map[string]any, + checkpoint interface{}, onRows func([]Row) (checkpoint interface{}), +) { + if args == nil { + args = map[string]interface{}{} + } + + args["fromid"] = ht.fromId + args["toid"] = ht.toId + args["checkpoint"] = checkpoint + args["bulk"] = 20000 + + if ht.snapshot.DriverName() != driver.MySQL { + query = strings.ReplaceAll(query, " USE INDEX (PRIMARY)", "") + } + + for { + // TODO: use Tx#SelectNamed() one nice day (https://github.com/jmoiron/sqlx/issues/779) + stmt, err := ht.snapshot.PrepareNamed(query) + if err != nil { + log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't prepare query")) + } + + var rows []Row + if err := stmt.Select(&rows, args); err != nil { + log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + + _ = stmt.Close() + + if len(rows) < 1 { + break + } + + if checkpoint = onRows(rows); checkpoint == nil { + break + } + + args["checkpoint"] = checkpoint + } +} + +type progressBar struct { + *mpb.Bar + + lastUpdate time.Time +} + +// IncrBy does pb.Bar.DecoratorEwmaUpdate() automatically. +func (pb *progressBar) IncrBy(n int) { + pb.Bar.IncrBy(n) + + now := time.Now() + + if !pb.lastUpdate.IsZero() { + pb.Bar.DecoratorEwmaUpdate(now.Sub(pb.lastUpdate)) + } + + pb.lastUpdate = now +} + +// historyType specifies a history data type. +type historyType struct { + // name is a human-readable common name. + name string + // idoTable specifies the source table. + idoTable string + // idoIdColumn specifies idoTable's primary key. + idoIdColumn string + // idoStartColumns specifies idoTable's event start time locations. (First non-NULL is used.) + idoStartColumns []string + // idoEndColumns specifies idoTable's event end time locations. (First non-NULL is used.) + idoEndColumns []string + // cacheSchema specifies .sqlite3's structure. + cacheSchema string + // cacheFiller fills cache from snapshot. + cacheFiller func(*historyType) + // cacheLimitQuery rationale: see migrate(). + cacheLimitQuery string + // migrationQuery SELECTs source data for actual migration. + migrationQuery string + // migrate does the actual migration. + migrate func(c *Config, idb *icingadb.DB, envId []byte, endpointId [sha1.Size]byte, ht *historyType) + + // cacheFile locates .sqlite3. + cacheFile string + // cache represents . + cache *sqlx.DB + // snapshot represents the data source. + snapshot *sqlx.Tx + // fromId is the first IDO row ID to migrate. + fromId uint64 + // toId is the last IDO row ID to migrate. + toId uint64 + // total summarizes the source data. + total int64 + // cacheTotal summarizes the cache source data. + cacheTotal int64 + // done summarizes the migrated data. + done int64 + // bar represents the current progress bar. + bar *progressBar + // lastId is the last already migrated ID. + lastId uint64 +} + +// setupBar (re-)initializes ht.bar. +func (ht *historyType) setupBar(progress *mpb.Progress, total int64) { + ht.bar = &progressBar{Bar: progress.AddBar( + total, + mpb.BarFillerClearOnComplete(), + mpb.PrependDecorators( + decor.Name(ht.name, decor.WC{W: len(ht.name) + 1, C: decor.DidentRight}), + decor.Percentage(decor.WC{W: 5}), + ), + mpb.AppendDecorators( + decor.EwmaETA(decor.ET_STYLE_GO, 0, decor.WC{W: 4}), + decor.Name(" "), + decor.EwmaSpeed(0, "%.0f/s", 0, decor.WC{W: 4}), + ), + )} +} + +type historyTypes []*historyType + +// forEach performs f per hts in parallel. +func (hts historyTypes) forEach(f func(*historyType)) { + eg, _ := errgroup.WithContext(context.Background()) + for _, ht := range hts { + ht := ht + eg.Go(func() error { + f(ht) + return nil + }) + } + + _ = eg.Wait() +} + +var types = historyTypes{ + { + name: "ack & comment", + idoTable: "icinga_commenthistory", + idoIdColumn: "commenthistory_id", + idoStartColumns: []string{"entry_time"}, + // Manual deletion time wins vs. time of expiration which never happens due to manual deletion. + idoEndColumns: []string{"deletion_time", "expiration_time"}, + migrationQuery: commentMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) { + migrateOneType(c, idb, envId, endpId, ht, convertCommentRows) + }, + }, + { + name: "downtime", + idoTable: "icinga_downtimehistory", + idoIdColumn: "downtimehistory_id", + // Fall back to scheduled time if actual time is missing. + idoStartColumns: []string{"actual_start_time", "scheduled_start_time"}, + idoEndColumns: []string{"actual_end_time", "scheduled_end_time"}, + migrationQuery: downtimeMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) { + migrateOneType(c, idb, envId, endpId, ht, convertDowntimeRows) + }, + }, + { + name: "flapping", + idoTable: "icinga_flappinghistory", + idoIdColumn: "flappinghistory_id", + idoStartColumns: []string{"event_time"}, + idoEndColumns: []string{"event_time"}, + cacheSchema: eventTimeCacheSchema, + cacheFiller: func(ht *historyType) { + buildEventTimeCache(ht, []string{ + "xh.flappinghistory_id id", "UNIX_TIMESTAMP(xh.event_time) event_time", + "xh.event_time_usec", "1001-xh.event_type event_is_start", "xh.object_id", + }) + }, + migrationQuery: flappingMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) { + migrateOneType(c, idb, envId, endpId, ht, convertFlappingRows) + }, + }, + { + name: "notification", + idoTable: "icinga_notifications", + idoIdColumn: "notification_id", + idoStartColumns: []string{"start_time"}, + idoEndColumns: []string{"end_time"}, + cacheSchema: previousHardStateCacheSchema, + cacheFiller: func(ht *historyType) { + buildPreviousHardStateCache(ht, []string{ + "xh.notification_id id", "xh.object_id", "xh.state last_hard_state", + }) + }, + cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state", + migrationQuery: notificationMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) { + migrateOneType(c, idb, envId, endpId, ht, convertNotificationRows) + }, + }, + { + name: "state", + idoTable: "icinga_statehistory", + idoIdColumn: "statehistory_id", + idoStartColumns: []string{"state_time"}, + idoEndColumns: []string{"state_time"}, + cacheSchema: previousHardStateCacheSchema, + cacheFiller: func(ht *historyType) { + buildPreviousHardStateCache(ht, []string{"xh.statehistory_id id", "xh.object_id", "xh.last_hard_state"}) + }, + cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state", + migrationQuery: stateMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) { + migrateOneType(c, idb, envId, endpId, ht, convertStateRows) + }, + }, +} diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index d778c4124..29b7c4440 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -6,7 +6,6 @@ import ( "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal/command" "github.com/icinga/icingadb/pkg/common" - "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingadb/history" "github.com/icinga/icingadb/pkg/icingadb/overdue" @@ -29,11 +28,9 @@ import ( ) const ( - ExitSuccess = 0 - ExitFailure = 1 - expectedRedisSchemaVersion = "5" - expectedMysqlSchemaVersion = 3 - expectedPostgresSchemaVersion = 1 + ExitSuccess = 0 + ExitFailure = 1 + expectedRedisSchemaVersion = "5" ) func main() { @@ -74,7 +71,7 @@ func run() int { } } - if err := checkDbSchema(context.Background(), db); err != nil { + if err := db.CheckSchema(context.Background()); err != nil { logger.Fatalf("%+v", err) } @@ -358,36 +355,6 @@ func run() int { } } -// checkDbSchema asserts the database schema of the expected version being present. -func checkDbSchema(ctx context.Context, db *icingadb.DB) error { - var expectedDbSchemaVersion uint16 - switch db.DriverName() { - case driver.MySQL: - expectedDbSchemaVersion = expectedMysqlSchemaVersion - case driver.PostgreSQL: - expectedDbSchemaVersion = expectedPostgresSchemaVersion - } - - var version uint16 - - err := db.QueryRowxContext(ctx, "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1").Scan(&version) - if err != nil { - return errors.Wrap(err, "can't check database schema version") - } - - if version != expectedDbSchemaVersion { - // Since these error messages are trivial and mostly caused by users, we don't need - // to print a stack trace here. However, since errors.Errorf() does this automatically, - // we need to use fmt instead. - return fmt.Errorf( - "unexpected database schema version: v%d (expected v%d), please make sure you have applied all database"+ - " migrations after upgrading Icinga DB", version, expectedDbSchemaVersion, - ) - } - - return nil -} - // monitorRedisSchema monitors rc's icinga:schema version validity. func monitorRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) { for { diff --git a/doc/06-Migration.md b/doc/06-Migration.md new file mode 100644 index 000000000..4a799ba8e --- /dev/null +++ b/doc/06-Migration.md @@ -0,0 +1,92 @@ +# Migration from IDO + +The Icinga DB Migration commandline tool migrates history data from [IDO] to +Icinga DB. Or, more precisely: from the IDO SQL database to the Icinga DB one. + +!!! info + + Everything else is already populated by Icinga DB itself. + Only the past history data of existing IDO setups + isn't known to Icinga DB without migration from IDO. + +## Icinga DB + +1. Make sure Icinga DB is up, running and writing to its database. +2. Optionally disable Icinga 2's IDO feature. + +!!! warning + + Migration will cause duplicate Icinga DB events + for the period both IDO and Icinga DB are active. + Read on, there is a way to avoid that. + +## Configuration file + +Create a YAML file like this somewhere: + +```yaml +icinga2: + # Content of /var/lib/icinga2/icingadb.env + env: "da39a3ee5e6b4b0d3255bfef95601890afBADHEX" + # Name of the main Icinga 2 endpoint writing to IDO + endpoint: master-1 +# IDO database +ido: + type: pgsql + host: 192.0.2.1 + port: 5432 + database: icinga + user: icinga + password: CHANGEME + # Input time range + #from: 0 + #to: 2147483647 +# Icinga DB database +icingadb: + type: mysql + host: 2001:db8::1 + port: 3306 + database: icingadb + user: icingadb + password: CHANGEME +``` + +### Input time range + +By default, everything is migrated. If you wish, you can restrict the input +data's start and/or end by giving `from` and/or `to` under `ido:` as Unix +timestamps (in seconds). + +Examples: + +* Now: Run in a shell: `date +%s` +* One year ago: Run in a shell: `date -d -1year +%s` +* Icinga DB usage start time: Query the Icinga DB database: + `SELECT MIN(event_time)/1000 FROM history;` + +The latter is useful for the range end to avoid duplicate events. + +## Cache directory + +Choose a (not necessarily yet existing) directory for Icinga DB Migration's +internal cache. If either there isn't much to migrate or the migration +process won't be interrupted by a reboot (of the machine +Icinga DB migration/database runs on), `mktemp -d` is enough. + +## Actual migration + +Run: + +```shell +icingadb-migrate -c icingadb-migration.yml -t ~/icingadb-migration.cache +``` + +In case of an interrupt re-run. + +!!! tip + + If there is much to migrate, use e.g. tmux to + protect yourself against SSH connection losses. + + +[IDO]: https://icinga.com/docs/icinga-2/latest/doc/14-features/#ido-database-db-ido diff --git a/go.mod b/go.mod index cec21d7c2..7e45b8fce 100644 --- a/go.mod +++ b/go.mod @@ -11,16 +11,20 @@ require ( github.com/jessevdk/go-flags v1.5.0 github.com/jmoiron/sqlx v1.3.5 github.com/lib/pq v1.10.6 + github.com/mattn/go-sqlite3 v1.14.6 github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd github.com/pkg/errors v0.9.1 github.com/ssgreg/journald v1.0.0 github.com/stretchr/testify v1.8.0 + github.com/vbauerster/mpb/v6 v6.0.4 go.uber.org/zap v1.21.0 golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) require ( + github.com/VividCortex/ewma v1.2.0 // indirect + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect github.com/benbjohnson/clock v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -28,7 +32,9 @@ require ( github.com/fatih/color v1.10.0 // indirect github.com/mattn/go-colorable v0.1.8 // indirect github.com/mattn/go-isatty v0.0.12 // indirect + github.com/mattn/go-runewidth v0.0.12 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rivo/uniseg v0.2.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect diff --git a/go.sum b/go.sum index 593de8b20..4339a8557 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= @@ -45,6 +49,8 @@ github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-runewidth v0.0.12 h1:Y41i/hVW3Pgwr8gV+J23B9YEY0zxjptBuCWEaxmAOow= +github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= @@ -57,6 +63,9 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/ssgreg/journald v1.0.0 h1:0YmTDPJXxcWDPba12qNMdO6TxvfkFSYpFIJ31CwmLcU= github.com/ssgreg/journald v1.0.0/go.mod h1:RUckwmTM8ghGWPslq2+ZBZzbb9/2KgjzYZ4JEP+oRt0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -67,6 +76,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/vbauerster/mpb/v6 v6.0.4 h1:h6J5zM/2wimP5Hj00unQuV8qbo5EPcj6wbkCqgj7KcY= +github.com/vbauerster/mpb/v6 v6.0.4/go.mod h1:a/+JT57gqh6Du0Ay5jSR+uBMfXGdlR7VQlGP52fJxLM= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -100,6 +111,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 33cbb9900..a9eed7fec 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -87,6 +87,41 @@ func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB { } } +const ( + expectedMysqlSchemaVersion = 3 + expectedPostgresSchemaVersion = 1 +) + +// CheckSchema asserts the database schema of the expected version being present. +func (db *DB) CheckSchema(ctx context.Context) error { + var expectedDbSchemaVersion uint16 + switch db.DriverName() { + case driver.MySQL: + expectedDbSchemaVersion = expectedMysqlSchemaVersion + case driver.PostgreSQL: + expectedDbSchemaVersion = expectedPostgresSchemaVersion + } + + var version uint16 + + err := db.QueryRowxContext(ctx, "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1").Scan(&version) + if err != nil { + return errors.Wrap(err, "can't check database schema version") + } + + if version != expectedDbSchemaVersion { + // Since these error messages are trivial and mostly caused by users, we don't need + // to print a stack trace here. However, since errors.Errorf() does this automatically, + // we need to use fmt instead. + return fmt.Errorf( + "unexpected database schema version: v%d (expected v%d), please make sure you have applied all database"+ + " migrations after upgrading Icinga DB", version, expectedDbSchemaVersion, + ) + } + + return nil +} + // BuildColumns returns all columns of the given struct. func (db *DB) BuildColumns(subject interface{}) []string { fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names @@ -131,7 +166,7 @@ func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int) { switch db.DriverName() { case driver.MySQL: // MySQL treats UPDATE id = id as a no-op. - clause = "ON DUPLICATE KEY UPDATE id = id" + clause = fmt.Sprintf(`ON DUPLICATE KEY UPDATE "%s" = "%s"`, columns[0], columns[0]) case driver.PostgreSQL: clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO NOTHING", table) } @@ -529,6 +564,28 @@ func (db *DB) CreateStreamed( ) } +// CreateIgnoreStreamed bulk creates the specified entities via NamedBulkExec. +// The insert statement is created using BuildInsertIgnoreStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) CreateIgnoreStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return errors.Wrap(err, "can't copy first entity") + } + + sem := db.GetSemaphoreForTable(utils.TableName(first)) + stmt, placeholders := db.BuildInsertIgnoreStmt(first) + + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.SplitOnDupId[contracts.Entity], onSuccess..., + ) +} + // UpsertStreamed bulk upserts the specified entities via NamedBulkExec. // The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and diff --git a/pkg/icingadb/v1/history/notification.go b/pkg/icingadb/v1/history/notification.go index 1ebc27aa0..17fd375ca 100644 --- a/pkg/icingadb/v1/history/notification.go +++ b/pkg/icingadb/v1/history/notification.go @@ -15,7 +15,7 @@ type NotificationHistory struct { State uint8 `json:"state"` PreviousHardState uint8 `json:"previous_hard_state"` Author string `json:"author"` - Text string `json:"text"` + Text types.String `json:"text"` UsersNotified uint16 `json:"users_notified"` } diff --git a/tests/go.sum b/tests/go.sum index 190631a7e..964ea7fae 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -66,6 +66,8 @@ github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBK github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -465,6 +467,7 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= @@ -584,6 +587,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -642,6 +647,8 @@ github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/vbauerster/mpb/v6 v6.0.4 h1:h6J5zM/2wimP5Hj00unQuV8qbo5EPcj6wbkCqgj7KcY= +github.com/vbauerster/mpb/v6 v6.0.4/go.mod h1:a/+JT57gqh6Du0Ay5jSR+uBMfXGdlR7VQlGP52fJxLM= github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= @@ -857,6 +864,7 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=