11package db
22
3- import "github.com/flanksource/config-db/api"
3+ import (
4+ "sync"
5+ "time"
6+
7+ sw "github.com/RussellLuo/slidingwindow"
8+ "github.com/google/uuid"
9+
10+ "github.com/flanksource/config-db/api"
11+ "github.com/flanksource/config-db/db/models"
12+ "github.com/flanksource/config-db/pkg/ratelimit"
13+ )
14+
15+ const (
16+ rateLimitWindow = time .Hour * 4
17+ maxChangesInWindow = 100
18+ )
419
520func GetWorkflowRunCount (ctx api.ScrapeContext , workflowID string ) (int64 , error ) {
621 var count int64
@@ -10,3 +25,84 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error
1025 Error
1126 return count , err
1227}
28+
29+ var (
30+ scraperLocks = sync.Map {}
31+ configRateLimiters = map [string ]* sw.Limiter {}
32+ )
33+
34+ func rateLimitChanges (ctx api.ScrapeContext , newChanges []* models.ConfigChange ) ([]* models.ConfigChange , error ) {
35+ if len (newChanges ) == 0 {
36+ return nil , nil
37+ }
38+
39+ lock , loaded := scraperLocks .LoadOrStore (ctx .ScrapeConfig ().GetPersistedID (), & sync.Mutex {})
40+ lock .(* sync.Mutex ).Lock ()
41+ defer lock .(* sync.Mutex ).Unlock ()
42+
43+ window := ctx .Properties ().Duration ("changes.max.window" , rateLimitWindow )
44+ max := ctx .Properties ().Int ("changes.max.count" , maxChangesInWindow )
45+
46+ if ! loaded {
47+ // populate the rate limit window for the scraper
48+ query := `SELECT config_id, COUNT(*), min(created_at) FROM config_changes
49+ WHERE change_type != 'TooManyChanges'
50+ AND NOW() - created_at <= ? GROUP BY config_id`
51+ rows , err := ctx .DB ().Raw (query , window ).Rows ()
52+ if err != nil {
53+ return nil , err
54+ }
55+
56+ for rows .Next () {
57+ var configID string
58+ var count int
59+ var earliest time.Time
60+ if err := rows .Scan (& configID , & count , & earliest ); err != nil {
61+ return nil , err
62+ }
63+
64+ rateLimiter , _ := sw .NewLimiter (window , int64 (max ), func () (sw.Window , sw.StopFunc ) {
65+ win , stopper := ratelimit .NewLocalWindow ()
66+ if count > 0 {
67+ win .SetStart (earliest )
68+ win .AddCount (int64 (count ))
69+ }
70+ return win , stopper
71+ })
72+ configRateLimiters [configID ] = rateLimiter
73+ }
74+ }
75+
76+ passingNewChanges := make ([]* models.ConfigChange , 0 , len (newChanges ))
77+ rateLimited := map [string ]struct {}{}
78+ for _ , change := range newChanges {
79+ rateLimiter , ok := configRateLimiters [change .ConfigID ]
80+ if ! ok {
81+ rl , _ := sw .NewLimiter (window , int64 (max ), func () (sw.Window , sw.StopFunc ) {
82+ return sw .NewLocalWindow ()
83+ })
84+ configRateLimiters [change .ConfigID ] = rl
85+ rateLimiter = rl
86+ }
87+
88+ if ! rateLimiter .Allow () {
89+ ctx .Logger .V (2 ).Infof ("change rate limited (config=%s)" , change .ConfigID )
90+ rateLimited [change .ConfigID ] = struct {}{}
91+ continue
92+ }
93+
94+ passingNewChanges = append (passingNewChanges , change )
95+ }
96+
97+ // For all the rate limited configs, we add a new "TooManyChanges" change
98+ for configID := range rateLimited {
99+ passingNewChanges = append (passingNewChanges , & models.ConfigChange {
100+ ConfigID : configID ,
101+ Summary : "Changes on this config has been rate limited" ,
102+ ChangeType : "TooManyChanges" ,
103+ ExternalChangeId : uuid .New ().String (),
104+ })
105+ }
106+
107+ return passingNewChanges , nil
108+ }
0 commit comments