Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
3c1a83e
Outsource cmd/icingadb.Exit* to cmd/internal
Al2Klimov Aug 3, 2021
700f88a
cmd/ido2icingadb: parse CLI flags
Al2Klimov Aug 3, 2021
dce3889
cmd/ido2icingadb: parse config file
Al2Klimov Aug 3, 2021
890b66d
cmd/ido2icingadb: connect to databases
Al2Klimov Aug 3, 2021
4eb2ba1
cmd/ido2icingadb: count total IDO events
Al2Klimov Aug 3, 2021
05c736a
.../compliance/check-licenses.sh: also check UNLICENSE
Al2Klimov Dec 7, 2021
2f5d5ea
cmd/ido2icingadb: compute previous progress
Al2Klimov Aug 20, 2021
495e0af
cmd/ido2icingadb: deduplicate parallelism
Al2Klimov Aug 20, 2021
1141058
cmd/ido2icingadb: split main()
Al2Klimov Aug 20, 2021
1d111b7
cmd/ido2icingadb: prepare cache
Al2Klimov Aug 23, 2021
b005b4c
cmd/ido2icingadb: make log a global var
Al2Klimov Aug 23, 2021
d9381ec
cmd/ido2icingadb: separate cache schema
Al2Klimov Aug 24, 2021
4de76da
cmd/ido2icingadb: introduce historyType#setupBar()
Al2Klimov Aug 24, 2021
d9d8fb7
cmd/ido2icingadb: make bulk a const
Al2Klimov Aug 24, 2021
423c948
cmd/ido2icingadb: introduce barIncrementer
Al2Klimov Aug 24, 2021
63ab8d9
cmd/ido2icingadb: support SELECT xh.foo_bar->FooBar int
Al2Klimov Aug 24, 2021
0704750
cmd/ido2icingadb: generalize cache schemata
Al2Klimov Aug 25, 2021
6ae22d5
cmd/ido2icingadb: fill cache
Al2Klimov Aug 25, 2021
6b58324
cmd/ido2icingadb: chunk all large queries
Al2Klimov Sep 6, 2021
5396afe
cmd/ido2icingadb: actually migrate
Al2Klimov Oct 8, 2021
517fdcb
cmd/ido2icingadb: correct log levels
Al2Klimov Sep 30, 2021
ae57d4f
cmd/ido2icingadb: reduce IDE warnings
Al2Klimov Sep 30, 2021
d9e2c7d
cmd/ido2icingadb: reason not actually used INNER JOINs
Al2Klimov Oct 5, 2021
87a6f8e
cmd/ido2icingadb: shorten SugaredLogger#With().Info() to #Infow()
Al2Klimov Oct 5, 2021
2111533
cmd/ido2icingadb: document sliceIdoHistory()
Al2Klimov Oct 5, 2021
31e7c38
cmd/ido2icingadb: types: add keys and delete zero values
Al2Klimov Oct 5, 2021
7d2ea6c
cmd/ido2icingadb: don't re-invent sqlx.In()
Al2Klimov Oct 5, 2021
f4a2741
cmd/ido2icingadb: remove redundant function
Al2Klimov Oct 6, 2021
f63892e
Revert "Outsource cmd/icingadb.Exit* to cmd/internal"
Al2Klimov Oct 8, 2021
3ca0b49
cmd/ido2icingadb: document main.go
Al2Klimov Oct 6, 2021
7e0d3d5
cmd/ido2icingadb: document cache.go
Al2Klimov Oct 8, 2021
662744e
cmd/ido2icingadb: migrate(): UPDATE after REPLACE
Al2Klimov Oct 8, 2021
4fe3134
cmd/ido2icingadb: document misc.go
Al2Klimov Oct 8, 2021
77fa207
cmd/ido2icingadb: simplify unnecessarily complex datatype
Al2Klimov Oct 11, 2021
068052f
cmd/ido2icingadb: migrate(): don't REPLACE, but upsert (one by one)
Al2Klimov Oct 12, 2021
6e7e217
cmd/ido2icingadb: sliceIdoHistory(): take named args
Al2Klimov Oct 12, 2021
bc5ad21
cmd/ido2icingadb: simplify computeProgress() a lot
Al2Klimov Dec 2, 2021
2c23b14
cmd/ido2icingadb: resume migration progress bar where interrupted
Al2Klimov Dec 2, 2021
b78b15e
cmd/ido2icingadb: ensure migration progress bar finishes
Al2Klimov Dec 2, 2021
34b3ee1
cmd/ido2icingadb: merge countIdoHistory() and computeProgress()
Al2Klimov Dec 2, 2021
4e58631
cmd/ido2icingadb: make IDs UUID -> SHA1
Al2Klimov Dec 2, 2021
163c862
cmd/ido2icingadb: env: "Environment" config constant value -> hex ID
Al2Klimov Dec 2, 2021
086b3d3
cmd/ido2icingadb: sliceIdoHistory(): re-use memory
Al2Klimov Dec 3, 2021
2d2cad1
cmd/ido2icingadb: sliceIdoHistory(): reduce onRows call complexity
Al2Klimov Dec 3, 2021
597cd63
cmd/ido2icingadb: reduce historyType#convertRows call complexity
Al2Klimov Dec 3, 2021
a2dae38
cmd/ido2icingadb: build fix
Al2Klimov Dec 3, 2021
aa585e9
cmd/ido2icingadb: don't hash non-hashable types
Al2Klimov Mar 1, 2022
bcf2256
cmd/ido2icingadb: correct mismatching FK ID
Al2Klimov Mar 1, 2022
d9a3e04
cmd/ido2icingadb: migrate(): run only one transaction at a time
Al2Klimov Mar 10, 2022
581270f
cmd/ido2icingadb: support Postgres
Al2Klimov Mar 17, 2022
f522b29
cmd/ido2icingadb: prefer generics over reflection
Al2Klimov Apr 4, 2022
7dd911d
cmd/ido2icingadb: better ETA
Al2Klimov Apr 6, 2022
3b634ec
cmd/ido2icingadb: build fix
Al2Klimov May 12, 2022
67c7fe9
cmd/ido2icingadb: go:embed schemata
Al2Klimov May 12, 2022
5470c29
cmd/ido2icingadb: go:embed large queries
Al2Klimov May 19, 2022
7c6f9dd
cmd/ido2icingadb: improve comments
Al2Klimov May 19, 2022
041d4a0
cmd/ido2icingadb: avoid &(*ht)[i]
Al2Klimov May 19, 2022
1a1f191
cmd/ido2icingadb: write SLA
Al2Klimov May 19, 2022
3cc8641
cmd/ido2icingadb: let SQLite VACUUM automatically
Al2Klimov Jun 13, 2022
ad0b6ca
cmd/ido2icingadb: remove obsolete code
Al2Klimov Jun 13, 2022
0e9c93c
cmd/ido2icingadb: rename onNewUncommittedDml to commitPeriodically
Al2Klimov Jun 13, 2022
f40a394
cmd/ido2icingadb: don't unnecessarily pre-fill ido_migration_progress
Al2Klimov Jun 20, 2022
752a909
cmd/ido2icingadb: improve code docs
Al2Klimov Jun 20, 2022
d9eb368
cmd/ido2icingadb: fix downtime SLA end time
Al2Klimov Jun 20, 2022
4f871ec
cmd/ido2icingadb: bulk, not prepare, upserts
Al2Klimov Jun 22, 2022
fe24c2d
cmd/ido2icingadb: fix missing config defaults
Al2Klimov Jun 22, 2022
f944844
cmd/ido2icingadb: build fix
Al2Klimov Jul 5, 2022
7896bc5
cmd/ido2icingadb: fix missing flapping end events
Al2Klimov Jul 5, 2022
90bc748
cmd/ido2icingadb: indicate no notification author as "", not "-"
Al2Klimov Jul 5, 2022
9601b16
cmd/ido2icingadb: fix missing ack clearings
Al2Klimov Jul 13, 2022
d69ccd1
cmd/ido2icingadb: don't unnecessarily overwrite already written/migra…
Al2Klimov Jul 13, 2022
74fce9b
cmd/ido2icingadb: fix missing flapping_history#percent_state_change_end
Al2Klimov Jul 13, 2022
1caca0b
cmd/ido2icingadb: fix duplicate comment
Al2Klimov Jul 13, 2022
98bb53b
cmd/ido2icingadb: clean up cache
Al2Klimov Jul 19, 2022
f063687
DB#BuildInsertIgnoreStmt(): handle primary key being not "id"
Al2Klimov Jul 15, 2022
cc98c34
cmd/ido2icingadb: centralise notification type conversion
Al2Klimov Jul 20, 2022
58cfbf4
cmd/ido2icingadb: allow converters to upsert
Al2Klimov Jul 20, 2022
34ef6be
cmd/ido2icingadb: upsert, not update, to make bulk statements
Al2Klimov Jul 20, 2022
fcae675
cmd/ido2icingadb: remove unused code
Al2Klimov Jul 20, 2022
59c77eb
cmd/ido2icingadb: reduce bulk size
Al2Klimov Jul 26, 2022
f0ec8b0
cmd/ido2icingadb: allow to migrate multiple IDO databases
Al2Klimov Jul 28, 2022
1c381cf
cmd/ido2icingadb: make cache filling fast again
Al2Klimov Jul 28, 2022
36d07aa
cmd/ido2icingadb: add docs
Al2Klimov Jul 5, 2022
8bf8a6f
cmd/ido2icingadb: rename to Icinga DB Migration
Al2Klimov Aug 3, 2022
aa571f0
cmd/ido2icingadb: remove unnecessary mutex
Al2Klimov Aug 5, 2022
690fcfa
cmd/ido2icingadb: chunkCacheTx(): commit less often
Al2Klimov Aug 5, 2022
23130d7
cmd/ido2icingadb: remove unnecessary transactions
Al2Klimov Aug 11, 2022
adcd004
Introduce DB#CreateIgnoreStreamed()
Al2Klimov Aug 19, 2022
6804bbd
cmd/ido2icingadb: increase parallelism
Al2Klimov Aug 19, 2022
6794252
cmd/ido2icingadb: remove unused code
Al2Klimov Aug 19, 2022
8402eb2
cmd/ido2icingadb: show ops/s
Al2Klimov Aug 19, 2022
dbf394f
cmd/ido2icingadb: make sure not to return false-positive nil (i.e. EO…
Al2Klimov Aug 24, 2022
2c0e927
cmd/ido2icingadb: allow custom input data time range
Al2Klimov Sep 6, 2022
7ff52c5
cmd/ido2icingadb: handle custom input data time range changes
Al2Klimov Sep 7, 2022
f7d132c
Make checkDbSchema() reusable as DB#CheckSchema()
Al2Klimov Oct 11, 2022
f233c63
cmd/ido2icingadb: assert schema
Al2Klimov Oct 11, 2022
5ebb2b0
cmd/ido2icingadb: Keep It Simple Stupid
Al2Klimov Oct 11, 2022
016a5dd
cmd/ido2icingadb: don't tell how to mess up everything
Al2Klimov Oct 11, 2022
649f7e0
cmd/ido2icingadb: build cache from [0,toId], not just [fromId,toId]
Al2Klimov Oct 14, 2022
70a0bfb
cmd/ido2icingadb: use built-in decorators for the progress bar
Al2Klimov Oct 24, 2022
0ff1728
cmd/ido2icingadb: move docs to the main tree
Al2Klimov Oct 25, 2022
6a5f3fd
cmd/: rename ido2icingadb/ to icingadb-migrate/
Al2Klimov Oct 26, 2022
c51d767
Migration: docs enhancements
Al2Klimov Oct 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/compliance/check-licenses.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -eo pipefail
find_license_file() {
MOD_NAME="$1"
LICENSE_DIR="vendor/$MOD_NAME"
LICENSE_FILES=({,../}LICENSE{,.txt,.md})
LICENSE_FILES=({,../}{,UN}LICENSE{,.txt,.md})

for LICENSE_FILE in "${LICENSE_FILES[@]}"; do
LICENSE_FILE="${LICENSE_DIR}/$LICENSE_FILE"
Expand All @@ -29,6 +29,8 @@ list_all_deps() {
COMPATIBLE_LINE=$(($LINENO + 2))

COMPATIBLE=(
# public domain
3cee2c43614ad4572d9d594c81b9348cf45ed5ac # vendor/github.com/vbauerster/mpb/v6/UNLICENSE
# MIT
66d504eb2f162b9cbf11b07506eeed90c6edabe1 # vendor/github.com/cespare/xxhash/v2/LICENSE.txt
1513ff663e946fdcadb630bed670d253b8b22e1e # vendor/github.com/davecgh/go-spew/spew/../LICENSE
Expand Down
298 changes: 298 additions & 0 deletions cmd/icingadb-migrate/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
package main

import (
"database/sql"
_ "embed"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"math"
"strings"
"time"
)

//go:embed embed/event_time_cache_schema.sql
var eventTimeCacheSchema string

//go:embed embed/previous_hard_state_cache_schema.sql
var previousHardStateCacheSchema string

// buildEventTimeCache rationale:
//
// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that.
// That would make the IDO reading even slower than the Icinga DB writing.
// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time
// and cache it into an SQLite database. Then steam from that database and the IDO.
//
// Similar for acknowledgements. (On non-recoverable errors the whole program exits.)
func buildEventTimeCache(ht *historyType, idoColumns []string) {
type row = struct {
Id uint64
EventTime int64
EventTimeUsec uint32
EventIsStart uint8
ObjectId uint64
}

chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
var checkpoint struct {
Cnt int64
MaxId sql.NullInt64
}
cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time")

ht.bar.SetCurrent(checkpoint.Cnt * 2)

// Stream source data...
sliceIdoHistory(
ht,
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" <= :toid AND xh."+
ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk",
nil, checkpoint.MaxId.Int64, // ... since we were interrupted:
func(idoRows []row) (checkpoint interface{}) {
for _, idoRow := range idoRows {
if idoRow.EventIsStart == 0 {
// Ack/flapping end event. Get the start event time:
var lst []struct {
EventTime int64
EventTimeUsec uint32
}
cacheSelect(
*tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?",
idoRow.ObjectId,
)

// If we have that, ...
if len(lst) > 0 {
// ... save the start event time for the actual migration:
cacheExec(
*tx,
"INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)",
idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec,
)

// This previously queried info isn't needed anymore.
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
}
} else {
// Ack/flapping start event directly after another start event (per checkable).
// The old one won't have (but the new one will) an end event (which will need its time).
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)

// An ack/flapping start event. The following end event (per checkable) will need its time.
cacheExec(
*tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)",
idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec,
)
}

commitPeriodically()
checkpoint = idoRow.Id
}

ht.bar.IncrBy(len(idoRows))
return
},
)

// This never queried info isn't needed anymore.
cacheExec(*tx, "DELETE FROM last_start_time")
})

ht.bar.SetTotal(ht.bar.Current(), true)
}

// buildPreviousHardStateCache rationale:
//
// Icinga DB's state_history#previous_hard_state would need a subquery.
// That make the IDO reading even slower than the Icinga DB writing.
// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state
// and cache it into an SQLite database. Then steam from that database and the IDO.
//
// Similar for notifications. (On non-recoverable errors the whole program exits.)
func buildPreviousHardStateCache(ht *historyType, idoColumns []string) {
type row = struct {
Id uint64
ObjectId uint64
LastHardState uint8
}

chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
var nextIds struct {
Cnt int64
MinId sql.NullInt64
}
cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids")

var previousHardStateCnt int64
cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state")

var checkpoint int64
if nextIds.MinId.Valid { // there are next_ids
checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending
} else { // there aren't any next_ids
// next_ids contains the most recently processed IDs and is only empty if...
if previousHardStateCnt == 0 {
// ... we didn't actually start yet...
checkpoint = math.MaxInt64 // start from the largest (possible) ID
} else {
// ... or we've already finished.
checkpoint = 0 // make following query no-op
}
}

ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt)

// We continue where we finished before. As we build the cache in reverse chronological order:
// 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds:
// a. Start migration after Icinga DB is up and running.
// b. Remove the cache before the next migration trial.
// 2. If the history gets cleaned up between two migration trials,
// the difference either just doesn't appear in the cache or - if already there - will be ignored later.

// Stream source data...
sliceIdoHistory(
ht,
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" <= :toid AND xh."+
ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk",
nil, checkpoint, // ... since we were interrupted:
func(idoRows []row) (checkpoint interface{}) {
for _, idoRow := range idoRows {
var nhs []struct{ NextHardState uint8 }
cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)

if len(nhs) < 1 { // we just started (per checkable)
// At the moment (we're "travelling back in time") that's the checkable's hard state:
cacheExec(
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
idoRow.ObjectId, idoRow.LastHardState,
)

// But for the current time point the previous hard state isn't known, yet:
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
} else if idoRow.LastHardState == nhs[0].NextHardState {
// The hard state didn't change yet (per checkable),
// so this time point also awaits the previous hard state.
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
} else { // the hard state changed (per checkable)
// That past hard state is now available for the processed future time points:
cacheExec(
*tx,
"INSERT INTO previous_hard_state(history_id, previous_hard_state) "+
"SELECT history_id, ? FROM next_ids WHERE object_id=?",
idoRow.LastHardState, idoRow.ObjectId,
)

// Now they have what they wanted:
cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId)

// That's done.
// Now do the same thing as in the "we just started" case above, for the same reason:

cacheExec(
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
idoRow.ObjectId, idoRow.LastHardState,
)

cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
}

commitPeriodically()
checkpoint = idoRow.Id
}

ht.bar.IncrBy(len(idoRows))
return
},
)

// No past hard state is available for the processed future time points, assuming pending:
cacheExec(
*tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids",
)

// Now they should have what they wanted:
cacheExec(*tx, "DELETE FROM next_hard_state")
cacheExec(*tx, "DELETE FROM next_ids")
})

ht.bar.SetTotal(ht.bar.Current(), true)
}

// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically()
// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.)
// (On non-recoverable errors the whole program exits.)
func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) {
logger := log.With("backend", "cache")

tx, err := cache.Beginx()
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
}

const commitInterval = 5 * time.Minute
nextCommit := time.Now().Add(commitInterval)

do(&tx, func() { // commitPeriodically
if now := time.Now(); now.After(nextCommit) {
if err := tx.Commit(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
}

var err error

tx, err = cache.Beginx()
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
}

nextCommit = nextCommit.Add(commitInterval)
}
})

if err := tx.Commit(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
}
}

// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.)
func cacheGet(cache interface {
Get(dest interface{}, query string, args ...interface{}) error
}, dest interface{}, query string, args ...interface{}) {
if err := cache.Get(dest, query, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}

// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.)
func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) {
if err := cacheTx.Select(dest, query, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}

// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits.
func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) {
if _, err := cacheTx.Exec(dml, args...); err != nil {
log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
Loading