55 "time"
66
77 sw "github.com/RussellLuo/slidingwindow"
8- "github.com/google/uuid"
98
9+ "github.com/flanksource/commons/collections"
1010 "github.com/flanksource/config-db/api"
1111 "github.com/flanksource/config-db/db/models"
1212 "github.com/flanksource/config-db/pkg/ratelimit"
@@ -15,6 +15,8 @@ import (
1515const (
1616 rateLimitWindow = time .Hour * 4
1717 maxChangesInWindow = 100
18+
19+ ChangeTypeTooManyChanges = "TooManyChanges"
1820)
1921
2022func GetWorkflowRunCount (ctx api.ScrapeContext , workflowID string ) (int64 , error ) {
@@ -29,52 +31,44 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error
2931var (
3032 scraperLocks = sync.Map {}
3133 configRateLimiters = map [string ]* sw.Limiter {}
34+
35+ // List of configs that have been rate limited.
36+ // It's used to avoid inserting mutliple "TooManyChanges" changes for the same config.
37+ rateLimitedConfigsPerScraper = sync.Map {}
3238)
3339
34- func rateLimitChanges (ctx api.ScrapeContext , newChanges []* models.ConfigChange ) ([]* models.ConfigChange , error ) {
35- if len (newChanges ) == 0 {
36- return nil , nil
40+ func rateLimitChanges (ctx api.ScrapeContext , newChanges []* models.ConfigChange ) ([]* models.ConfigChange , [] string , error ) {
41+ if len (newChanges ) == 0 || ctx . ScrapeConfig (). GetPersistedID () == nil {
42+ return newChanges , nil , nil
3743 }
3844
39- lock , loaded := scraperLocks .LoadOrStore (ctx .ScrapeConfig ().GetPersistedID (), & sync.Mutex {})
45+ window := ctx .Properties ().Duration ("changes.max.window" , rateLimitWindow )
46+ max := ctx .Properties ().Int ("changes.max.count" , maxChangesInWindow )
47+ scraperID := ctx .ScrapeConfig ().GetPersistedID ().String ()
48+
49+ lock , loaded := scraperLocks .LoadOrStore (scraperID , & sync.Mutex {})
4050 lock .(* sync.Mutex ).Lock ()
4151 defer lock .(* sync.Mutex ).Unlock ()
4252
43- window := ctx . Properties (). Duration ( "changes.max.window" , rateLimitWindow )
44- max := ctx . Properties (). Int ( "changes.max.count" , maxChangesInWindow )
53+ _rateLimitedConfigs , _ := rateLimitedConfigsPerScraper . LoadOrStore ( scraperID , map [ string ] struct {}{} )
54+ rateLimitedConfigs := _rateLimitedConfigs .( map [ string ] struct {} )
4555
4656 if ! loaded {
47- // populate the rate limit window for the scraper
48- query := `SELECT config_id, COUNT(*), min(created_at) FROM config_changes
49- WHERE change_type != 'TooManyChanges'
50- AND NOW() - created_at <= ? GROUP BY config_id`
51- rows , err := ctx .DB ().Raw (query , window ).Rows ()
52- if err != nil {
53- return nil , err
57+ // This is the initial sync of the rate limiter with the database.
58+ // Happens only once for each scrape config.
59+ if err := syncWindow (ctx , max , window ); err != nil {
60+ return nil , nil , err
5461 }
5562
56- for rows .Next () {
57- var configID string
58- var count int
59- var earliest time.Time
60- if err := rows .Scan (& configID , & count , & earliest ); err != nil {
61- return nil , err
62- }
63-
64- rateLimiter , _ := sw .NewLimiter (window , int64 (max ), func () (sw.Window , sw.StopFunc ) {
65- win , stopper := ratelimit .NewLocalWindow ()
66- if count > 0 {
67- win .SetStart (earliest )
68- win .AddCount (int64 (count ))
69- }
70- return win , stopper
71- })
72- configRateLimiters [configID ] = rateLimiter
63+ if rlConfigs , err := syncCurrentlyRateLimitedConfigs (ctx , window ); err != nil {
64+ return nil , nil , err
65+ } else {
66+ rateLimitedConfigs = rlConfigs
7367 }
7468 }
7569
70+ rateLimitedThisRun := map [string ]struct {}{}
7671 passingNewChanges := make ([]* models.ConfigChange , 0 , len (newChanges ))
77- rateLimited := map [string ]struct {}{}
7872 for _ , change := range newChanges {
7973 rateLimiter , ok := configRateLimiters [change .ConfigID ]
8074 if ! ok {
@@ -86,23 +80,102 @@ func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange)
8680 }
8781
8882 if ! rateLimiter .Allow () {
89- ctx .Logger .V (2 ).Infof ("change rate limited (config=%s)" , change .ConfigID )
90- rateLimited [change .ConfigID ] = struct {}{}
83+ ctx .Logger .V (1 ).Infof ("change rate limited (config=%s, external_id=%s, type=%s )" , change .ConfigID , change . ExternalChangeId , change . ChangeType )
84+ rateLimitedThisRun [change .ConfigID ] = struct {}{}
9185 continue
86+ } else {
87+ delete (rateLimitedConfigs , change .ConfigID )
9288 }
9389
9490 passingNewChanges = append (passingNewChanges , change )
9591 }
9692
97- // For all the rate limited configs, we add a new "TooManyChanges" change
98- for configID := range rateLimited {
99- passingNewChanges = append (passingNewChanges , & models.ConfigChange {
100- ConfigID : configID ,
101- Summary : "Changes on this config has been rate limited" ,
102- ChangeType : "TooManyChanges" ,
103- ExternalChangeId : uuid .New ().String (),
93+ var newlyRateLimited []string
94+ for configID := range rateLimitedThisRun {
95+ if _ , ok := rateLimitedConfigs [configID ]; ! ok {
96+ newlyRateLimited = append (newlyRateLimited , configID )
97+ }
98+ }
99+
100+ rateLimitedConfigs = collections .MergeMap (rateLimitedConfigs , rateLimitedThisRun )
101+ rateLimitedConfigsPerScraper .Store (scraperID , rateLimitedConfigs )
102+
103+ return passingNewChanges , newlyRateLimited , nil
104+ }
105+
106+ func syncCurrentlyRateLimitedConfigs (ctx api.ScrapeContext , window time.Duration ) (map [string ]struct {}, error ) {
107+ query := `WITH latest_changes AS (
108+ SELECT
109+ DISTINCT ON (config_id) config_id,
110+ change_type
111+ FROM
112+ config_changes
113+ LEFT JOIN config_items ON config_items.id = config_changes.config_id
114+ WHERE
115+ scraper_id = ?
116+ AND NOW() - config_changes.created_at <= ?
117+ ORDER BY
118+ config_id,
119+ config_changes.created_at DESC
120+ ) SELECT config_id FROM latest_changes WHERE change_type = ?`
121+ rows , err := ctx .DB ().Raw (query , ctx .ScrapeConfig ().GetPersistedID (), window , ChangeTypeTooManyChanges ).Rows ()
122+ if err != nil {
123+ return nil , err
124+ }
125+
126+ output := make (map [string ]struct {})
127+ for rows .Next () {
128+ var id string
129+ if err := rows .Scan (& id ); err != nil {
130+ return nil , err
131+ }
132+
133+ ctx .Logger .V (1 ).Infof ("config %s is currently found to be rate limited" , id )
134+ output [id ] = struct {}{}
135+ }
136+
137+ return output , rows .Err ()
138+ }
139+
140+ // syncWindow syncs the rate limit window for the scraper with the changes in the db.
141+ func syncWindow (ctx api.ScrapeContext , max int , window time.Duration ) error {
142+ query := `SELECT
143+ config_id,
144+ COUNT(*),
145+ MIN(config_changes.created_at) AS min_created_at
146+ FROM
147+ config_changes
148+ LEFT JOIN config_items ON config_items.id = config_changes.config_id
149+ WHERE
150+ scraper_id = ?
151+ AND change_type != ?
152+ AND NOW() - config_changes.created_at <= ?
153+ GROUP BY
154+ config_id`
155+ rows , err := ctx .DB ().Raw (query , ctx .ScrapeConfig ().GetPersistedID (), ChangeTypeTooManyChanges , window ).Rows ()
156+ if err != nil {
157+ return err
158+ }
159+
160+ for rows .Next () {
161+ var configID string
162+ var count int
163+ var earliest time.Time
164+ if err := rows .Scan (& configID , & count , & earliest ); err != nil {
165+ return err
166+ }
167+ ctx .Logger .V (3 ).Infof ("config %s has %d changes in the last %s" , configID , count , window )
168+
169+ rateLimiter , _ := sw .NewLimiter (window , int64 (max ), func () (sw.Window , sw.StopFunc ) {
170+ win , stopper := ratelimit .NewLocalWindow ()
171+ if count > 0 {
172+ win .SetStart (earliest )
173+ win .AddCount (int64 (count ))
174+ }
175+ return win , stopper
104176 })
177+ configRateLimiters [configID ] = rateLimiter
105178 }
106179
107- return passingNewChanges , nil
180+ return rows . Err ()
108181}
0 commit comments