Skip to content

Commit 9e039f5

Browse files
committed
feat: rate limits on changes per config
1 parent 121f062 commit 9e039f5

File tree

4 files changed

+146
-1
lines changed

4 files changed

+146
-1
lines changed

db/update.go

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"slices"
77
"strconv"
88
"strings"
9+
"sync"
910
"time"
1011

12+
sw "github.com/RussellLuo/slidingwindow"
1113
"github.com/aws/smithy-go/ptr"
1214
"github.com/dominikbraun/graph"
1315
jsonpatch "github.com/evanphx/json-patch"
@@ -29,9 +31,16 @@ import (
2931
"github.com/samber/lo"
3032
"gorm.io/gorm"
3133
"gorm.io/gorm/clause"
34+
35+
"github.com/flanksource/config-db/pkg/ratelimit"
3236
)
3337

34-
const configItemsBulkInsertSize = 200
38+
const (
39+
configItemsBulkInsertSize = 200
40+
41+
rateLimitWindow = time.Hour * 4
42+
maxChangesInWindow = 100
43+
)
3544

3645
func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error {
3746
var deletedAt interface{}
@@ -325,6 +334,83 @@ func UpdateAnalysisStatusBefore(ctx api.ScrapeContext, before time.Time, scraper
325334
Error
326335
}
327336

337+
var muMap = sync.Map{}
338+
var configMap = map[string]*sw.Limiter{}
339+
340+
func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, error) {
341+
if len(newChanges) == 0 {
342+
return nil, nil
343+
}
344+
345+
lock, loaded := muMap.LoadOrStore(ctx.ScrapeConfig().GetPersistedID(), &sync.Mutex{})
346+
lock.(*sync.Mutex).Lock()
347+
defer lock.(*sync.Mutex).Unlock()
348+
349+
window := ctx.Properties().Duration("changes.max.window", rateLimitWindow)
350+
max := ctx.Properties().Int("changes.max.count", maxChangesInWindow)
351+
352+
if !loaded {
353+
// populate the rate limit window for the scraper
354+
query := "SELECT config_id, COUNT(*), min(created_at) FROM config_changes WHERE NOW() - created_at <= ? GROUP BY config_id"
355+
rows, err := ctx.DB().Raw(query, window).Rows()
356+
if err != nil {
357+
return nil, err
358+
}
359+
360+
for rows.Next() {
361+
var configID string
362+
var count int
363+
var earliest time.Time
364+
if err := rows.Scan(&configID, &count, &earliest); err != nil {
365+
return nil, err
366+
}
367+
368+
rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
369+
win, stopper := ratelimit.NewLocalWindow()
370+
if count > 0 {
371+
win.SetStart(earliest)
372+
win.AddCount(int64(count))
373+
}
374+
return win, stopper
375+
})
376+
configMap[configID] = rateLimiter
377+
}
378+
}
379+
380+
passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges))
381+
rateLimited := map[string]struct{}{}
382+
for _, change := range newChanges {
383+
rateLimiter, ok := configMap[change.ConfigID]
384+
if !ok {
385+
rl, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
386+
return sw.NewLocalWindow()
387+
})
388+
configMap[change.ConfigID] = rl
389+
rateLimiter = rl
390+
}
391+
392+
if !rateLimiter.Allow() {
393+
ctx.Logger.V(2).Infof("change rate limited (config=%s)", change.ConfigID)
394+
rateLimited[change.ConfigID] = struct{}{}
395+
continue
396+
}
397+
398+
passingNewChanges = append(passingNewChanges, change)
399+
}
400+
401+
// For all the rate limited configs, we add a new "TooManyChanges" change
402+
for configID := range rateLimited {
403+
passingNewChanges = append(passingNewChanges, &models.ConfigChange{
404+
ConfigID: configID,
405+
Summary: "Changes on this config has been rate limited",
406+
ChangeType: "TooManyChanges",
407+
ExternalChangeId: uuid.New().String(),
408+
})
409+
}
410+
411+
return passingNewChanges, nil
412+
}
413+
328414
// SaveResults creates or update a configuration with config changes
329415
func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
330416
startTime, err := GetCurrentDBTime(ctx)
@@ -352,6 +438,11 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
352438
}
353439
}
354440

441+
newChanges, err = rateLimitChanges(ctx, newChanges)
442+
if err != nil {
443+
return fmt.Errorf("failed to rate limit changes: %w", err)
444+
}
445+
355446
if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil {
356447
return fmt.Errorf("failed to create config changes: %w", err)
357448
}

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/subscription/armsubscription v1.1.0
2020
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/trafficmanager/armtrafficmanager v1.0.0
2121
github.com/Jeffail/gabs/v2 v2.7.0
22+
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b
2223
github.com/aws/aws-sdk-go-v2 v1.18.0
2324
github.com/aws/aws-sdk-go-v2/config v1.18.25
2425
github.com/aws/aws-sdk-go-v2/credentials v1.13.24
@@ -117,6 +118,7 @@ require (
117118
github.com/ghodss/yaml v1.0.0 // indirect
118119
github.com/go-logr/stdr v1.2.2 // indirect
119120
github.com/go-openapi/inflect v0.19.0 // indirect
121+
github.com/go-redis/redis v6.15.9+incompatible // indirect
120122
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
121123
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
122124
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko
672672
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
673673
github.com/RaveNoX/go-jsonmerge v1.0.0 h1:2e0nqnadoGUP8rAvcA0hkQelZreVO5X3BHomT2XMrAk=
674674
github.com/RaveNoX/go-jsonmerge v1.0.0/go.mod h1:qYM/NA77LhO4h51JJM7Z+xBU3ovqrNIACZe+SkSNVFo=
675+
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU=
676+
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4=
675677
github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf h1:+GdVyvpzTy3UFAS1+hbTqm9Mk0U1Xrocm28s/E2GWz0=
676678
github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf/go.mod h1:FiuynIwe98RFhWI8nZ0dnsldPVsBy9rHH1hn2WYwme4=
677679
github.com/WinterYukky/gorm-extra-clause-plugin v0.2.0 h1:s1jobT8PlSyG/FXczfoGSt4r46iPiT4ZShe35k5/2y4=
@@ -926,6 +928,8 @@ github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogB
926928
github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
927929
github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M=
928930
github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M=
931+
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
932+
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
929933
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
930934
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
931935
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=

pkg/ratelimit/ratelimit.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package ratelimit
2+
3+
import (
4+
"time"
5+
6+
sw "github.com/RussellLuo/slidingwindow"
7+
)
8+
9+
// LocalWindow represents a window that ignores sync behavior entirely
10+
// and only stores counters in memory.
11+
//
12+
// NOTE: It's an exact copy of the LocalWindow provided by RussellLuo/slidingwindow
13+
// with an added capability of setting a custom start time.
14+
type LocalWindow struct {
15+
// The start boundary (timestamp in nanoseconds) of the window.
16+
// [start, start + size)
17+
start int64
18+
19+
// The total count of events happened in the window.
20+
count int64
21+
}
22+
23+
func NewLocalWindow() (*LocalWindow, sw.StopFunc) {
24+
return &LocalWindow{}, func() {}
25+
}
26+
27+
func (w *LocalWindow) SetStart(s time.Time) {
28+
w.start = s.UnixNano()
29+
}
30+
31+
func (w *LocalWindow) Start() time.Time {
32+
return time.Unix(0, w.start)
33+
}
34+
35+
func (w *LocalWindow) Count() int64 {
36+
return w.count
37+
}
38+
39+
func (w *LocalWindow) AddCount(n int64) {
40+
w.count += n
41+
}
42+
43+
func (w *LocalWindow) Reset(s time.Time, c int64) {
44+
w.start = s.UnixNano()
45+
w.count = c
46+
}
47+
48+
func (w *LocalWindow) Sync(now time.Time) {}

0 commit comments

Comments
 (0)