Skip to content

Commit 2717c61

Browse files
committed
wip
1 parent efec262 commit 2717c61

File tree

7 files changed

+217
-6
lines changed

7 files changed

+217
-6
lines changed

pkg/icingadb/db.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,9 @@ func (db *DB) BatchSizeByPlaceholders(n int) int {
506506
// YieldAll executes the query with the supplied scope,
507507
// scans each resulting row into an entity returned by the factory function,
508508
// and streams them into a returned channel.
509-
func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error) {
509+
func (db *DB) YieldAll(
510+
ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, namedQueryParams bool, scope ...interface{},
511+
) (<-chan contracts.Entity, <-chan error) {
510512
entities := make(chan contracts.Entity, 1)
511513
g, ctx := errgroup.WithContext(ctx)
512514

@@ -515,7 +517,14 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
515517
defer db.log(ctx, query, &counter).Stop()
516518
defer close(entities)
517519

518-
rows, err := db.NamedQueryContext(ctx, query, scope)
520+
var rows *sqlx.Rows
521+
var err error
522+
if namedQueryParams {
523+
rows, err = db.NamedQueryContext(ctx, query, scope)
524+
} else {
525+
rows, err = db.QueryxContext(ctx, query, scope...)
526+
}
527+
519528
if err != nil {
520529
return internal.CantPerformQuery(err, query)
521530
}

pkg/icingadb/sla.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package icingadb
2+
3+
import (
4+
"context"
5+
"github.com/icinga/icingadb/pkg/contracts"
6+
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
7+
"github.com/icinga/icingadb/pkg/types"
8+
"golang.org/x/sync/errgroup"
9+
"time"
10+
)
11+
12+
type SlaHistoryTrail struct {
13+
Id types.Int `json:"id" db:"-"`
14+
v1.EnvironmentMeta `json:",inline"`
15+
HostId types.Binary `json:"host_id"`
16+
ServiceId types.Binary `json:"service_id"`
17+
EventType string `json:"event_type"`
18+
EventTime types.UnixMilli `json:"event_time"`
19+
}
20+
21+
// Fingerprint implements the contracts.Fingerprinter interface.
22+
func (sht SlaHistoryTrail) Fingerprint() contracts.Fingerprinter {
23+
return sht
24+
}
25+
26+
// ID implements part of the contracts.IDer interface.
27+
func (sht SlaHistoryTrail) ID() contracts.ID {
28+
return sht.Id
29+
}
30+
31+
// SetID implements part of the contracts.IDer interface.
32+
func (sht *SlaHistoryTrail) SetID(id contracts.ID) {
33+
sht.Id = id.(types.Int)
34+
}
35+
36+
type SlaHostHistoryTrailColumns struct {
37+
v1.EntityWithoutChecksum `json:",inline"`
38+
v1.EnvironmentMeta `json:",inline"`
39+
}
40+
41+
type SlaServiceHistoryTrailColumns struct {
42+
v1.EntityWithoutChecksum `json:",inline"`
43+
v1.EnvironmentMeta `json:",inline"`
44+
HostId types.Binary `json:"host_id"`
45+
}
46+
47+
func CheckableToSlaTrailEntities(ctx context.Context, g *errgroup.Group, checkables <-chan contracts.Entity, eventType string) <-chan contracts.Entity {
48+
entities := make(chan contracts.Entity, 1)
49+
50+
g.Go(func() error {
51+
defer close(entities)
52+
53+
for {
54+
select {
55+
case checkable, ok := <-checkables:
56+
if !ok {
57+
return nil
58+
}
59+
60+
entity := &SlaHistoryTrail{
61+
EventTime: types.UnixMilli(time.Now()),
62+
EventType: eventType,
63+
}
64+
65+
switch ptr := checkable.(type) {
66+
case *v1.Host:
67+
entity.HostId = ptr.Id
68+
entity.EnvironmentId = ptr.EnvironmentId
69+
case *v1.Service:
70+
entity.HostId = ptr.HostId
71+
entity.ServiceId = ptr.Id
72+
entity.EnvironmentId = ptr.EnvironmentId
73+
}
74+
75+
entities <- entity
76+
case <-ctx.Done():
77+
return ctx.Err()
78+
}
79+
}
80+
})
81+
82+
return entities
83+
}
84+
85+
var (
86+
_ contracts.Entity = (*SlaHistoryTrail)(nil)
87+
)

pkg/icingadb/sync.go

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.uber.org/zap"
1616
"golang.org/x/sync/errgroup"
1717
"runtime"
18+
"strings"
1819
"time"
1920
)
2021

@@ -85,7 +86,8 @@ func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error {
8586

8687
actual, dbErrs := s.db.YieldAll(
8788
ctx, subject.FactoryForDelta(),
88-
s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()), e.Meta(),
89+
s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()),
90+
true, e.Meta(),
8991
)
9092
// Let errors from DB cancel our group.
9193
com.ErrgroupReceive(g, dbErrs)
@@ -128,9 +130,31 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
128130
entities = delta.Create.Entities(ctx)
129131
}
130132

133+
var slaTrailEntities chan contracts.Entity
134+
onSuccessHandlers := []OnSuccess[contracts.Entity]{
135+
OnSuccessIncrement[contracts.Entity](stat),
136+
}
137+
138+
switch delta.Subject.Entity().(type) {
139+
case *v1.Host, *v1.Service:
140+
slaTrailEntities = make(chan contracts.Entity)
141+
onSuccessHandlers = append(onSuccessHandlers, OnSuccessSendTo[contracts.Entity](slaTrailEntities))
142+
}
143+
131144
g.Go(func() error {
132-
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
145+
if slaTrailEntities != nil {
146+
defer close(slaTrailEntities)
147+
}
148+
149+
return s.db.CreateStreamed(ctx, entities, onSuccessHandlers...)
133150
})
151+
152+
if slaTrailEntities != nil {
153+
s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
154+
g.Go(func() error {
155+
return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, slaTrailEntities, "create"))
156+
})
157+
}
134158
}
135159

136160
// Update
@@ -160,6 +184,40 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
160184
// Delete
161185
if len(delta.Delete) > 0 {
162186
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
187+
entity := delta.Subject.Entity()
188+
switch entity.(type) {
189+
case *v1.Host, *v1.Service:
190+
g.Go(func() error {
191+
s.logger.Infof("Inserting %d items of type %s sla history trails of type delete", len(delta.Delete), utils.Key(utils.Name(entity), ' '))
192+
193+
var entities <-chan contracts.Entity
194+
var columns interface{}
195+
196+
if _, ok := entity.(*v1.Host); ok {
197+
columns = &SlaHostHistoryTrailColumns{}
198+
} else {
199+
columns = &SlaServiceHistoryTrailColumns{}
200+
}
201+
202+
query := s.db.BuildSelectStmt(entity, columns)
203+
if len(delta.Delete) == 1 {
204+
query += ` WHERE id = ?`
205+
} else {
206+
var placeholders []string
207+
for i := 0; i < len(delta.Delete); i++ {
208+
placeholders = append(placeholders, "?")
209+
}
210+
211+
query += fmt.Sprintf(` WHERE id IN (%s)`, strings.Join(placeholders, `, `))
212+
}
213+
var err <-chan error
214+
entities, err = s.db.YieldAll(ctx, delta.Subject.Factory(), query, false, delta.Delete.IDs()...)
215+
com.ErrgroupReceive(g, err)
216+
217+
return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, entities, "delete"))
218+
})
219+
}
220+
163221
g.Go(func() error {
164222
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
165223
})
@@ -187,7 +245,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
187245

188246
actualCvs, errs := s.db.YieldAll(
189247
ctx, cv.FactoryForDelta(),
190-
s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()), e.Meta(),
248+
s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()),
249+
true, e.Meta(),
191250
)
192251
com.ErrgroupReceive(g, errs)
193252

@@ -199,7 +258,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
199258

200259
actualFlatCvs, errs := s.db.YieldAll(
201260
ctx, flatCv.FactoryForDelta(),
202-
s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()), e.Meta(),
261+
s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()),
262+
true, e.Meta(),
203263
)
204264
com.ErrgroupReceive(g, errs)
205265

pkg/types/int.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"database/sql/driver"
77
"encoding"
88
"encoding/json"
9+
"fmt"
910
"github.com/icinga/icingadb/internal"
11+
"github.com/icinga/icingadb/pkg/contracts"
1012
"strconv"
1113
)
1214

@@ -58,11 +60,16 @@ func (i *Int) UnmarshalJSON(data []byte) error {
5860
return nil
5961
}
6062

63+
func (i Int) String() string {
64+
return fmt.Sprintf("%d", i.Int64)
65+
}
66+
6167
// Assert interface compliance.
6268
var (
6369
_ json.Marshaler = Int{}
6470
_ json.Unmarshaler = (*Int)(nil)
6571
_ encoding.TextUnmarshaler = (*Int)(nil)
6672
_ driver.Valuer = Int{}
6773
_ sql.Scanner = (*Int)(nil)
74+
_ contracts.ID = (*Int)(nil)
6875
)

schema/mysql/schema.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,18 @@ CREATE TABLE sla_history_downtime (
13211321
INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention'
13221322
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
13231323

1324+
CREATE TABLE sla_history_trail (
1325+
id bigint NOT NULL AUTO_INCREMENT,
1326+
environment_id binary(20) NOT NULL COMMENT 'environment.id',
1327+
host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)',
1328+
service_id binary(20) DEFAULT NULL COMMENT 'service.id (may reference already deleted services)',
1329+
1330+
event_type enum('delete', 'create') NOT NULL,
1331+
event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred',
1332+
1333+
PRIMARY KEY (id)
1334+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
1335+
13241336
CREATE TABLE icingadb_schema (
13251337
id int unsigned NOT NULL AUTO_INCREMENT,
13261338
version smallint unsigned NOT NULL,

schema/pgsql/schema.sql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ CREATE TYPE boolenum AS ENUM ( 'n', 'y' );
1717
CREATE TYPE acked AS ENUM ( 'n', 'y', 'sticky' );
1818
CREATE TYPE state_type AS ENUM ( 'hard', 'soft' );
1919
CREATE TYPE checkable_type AS ENUM ( 'host', 'service' );
20+
CREATE TYPE sla_trail_event_type AS ENUM ( 'create', 'delete' );
2021
CREATE TYPE comment_type AS ENUM ( 'comment', 'ack' );
2122
CREATE TYPE notification_type AS ENUM ( 'downtime_start', 'downtime_end', 'downtime_removed', 'custom', 'acknowledgement', 'problem', 'recovery', 'flapping_start', 'flapping_end' );
2223
CREATE TYPE history_type AS ENUM ( 'notification', 'state_change', 'downtime_start', 'downtime_end', 'comment_add', 'comment_remove', 'flapping_start', 'flapping_end', 'ack_set', 'ack_clear' );
@@ -2147,6 +2148,23 @@ COMMENT ON COLUMN sla_history_downtime.downtime_id IS 'downtime.id (may referenc
21472148
COMMENT ON COLUMN sla_history_downtime.downtime_start IS 'start time of the downtime';
21482149
COMMENT ON COLUMN sla_history_downtime.downtime_end IS 'end time of the downtime';
21492150

2151+
CREATE TABLE sla_history_trail (
2152+
id bigserial NOT NULL,
2153+
environment_id bytea20 NOT NULL,
2154+
host_id bytea20 NOT NULL,
2155+
service_id bytea20 DEFAULT NULL,
2156+
2157+
event_type sla_trail_event_type NOT NULL,
2158+
event_time biguint NOT NULL,
2159+
2160+
CONSTRAINT pk_sla_history_trail PRIMARY KEY (id)
2161+
);
2162+
2163+
COMMENT ON COLUMN sla_history_trail.environment_id IS 'environment.id';
2164+
COMMENT ON COLUMN sla_history_trail.host_id IS 'host.id (may reference already deleted hosts)';
2165+
COMMENT ON COLUMN sla_history_trail.service_id IS 'service.id (may reference already deleted services)';
2166+
COMMENT ON COLUMN sla_history_trail.event_time IS 'unix timestamp the event occurred';
2167+
21502168
CREATE SEQUENCE icingadb_schema_id_seq;
21512169

21522170
CREATE TABLE icingadb_schema (

tests/object_sync_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,24 @@ func TestObjectSync(t *testing.T) {
310310
t.Skip()
311311
})
312312

313+
t.Run("Sla History Trail", func(t *testing.T) {
314+
t.Parallel()
315+
316+
assert.Eventuallyf(t, func() bool {
317+
var count int
318+
err := db.Get(&count, "SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NULL")
319+
require.NoError(t, err, "querying hosts sla history trail should not fail")
320+
return count == len(data.Hosts)
321+
}, 20*time.Second, 200*time.Millisecond, "Newly created hosts should exists in database")
322+
323+
assert.Eventuallyf(t, func() bool {
324+
var count int
325+
err := db.Get(&count, "SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NOT NULL")
326+
require.NoError(t, err, "querying services sla history trail should not fail")
327+
return count == len(data.Services)
328+
}, 20*time.Second, 200*time.Millisecond, "Newly created services should exists in database")
329+
})
330+
313331
t.Run("RuntimeUpdates", func(t *testing.T) {
314332
t.Parallel()
315333

0 commit comments

Comments
 (0)