Skip to content

Commit 10de3f0

Browse files
func: add the possibilty to stop stop handlers by type
1 parent d01f41e commit 10de3f0

File tree

2 files changed

+35
-16
lines changed

2 files changed

+35
-16
lines changed

policy/handler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ func TestHandler_Run_ScalingNeededAndCooldown_Integration(t *testing.T) {
530530
limiter: ml,
531531
}
532532

533-
must.NoError(t, handler.loadCheckRunners())
533+
must.NoError(t, handler.configureHorizontalPolicy())
534534

535535
go handler.Run(ctx)
536536
time.Sleep(30 * time.Millisecond)
@@ -686,7 +686,7 @@ func TestHandler_Run_StateChanges_Integration(t *testing.T) {
686686
nextAction: sdk.ScalingAction{},
687687
}
688688

689-
must.NoError(t, handler.loadCheckRunners())
689+
must.NoError(t, handler.configureHorizontalPolicy())
690690

691691
go handler.Run(ctx)
692692
time.Sleep(30 * time.Millisecond)

policy/manager.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"maps"
11+
"slices"
1112
"strings"
1213
"sync"
1314
"time"
@@ -22,8 +23,9 @@ import (
2223
const DefaultLimiterTimeout = 2 * time.Minute
2324

2425
type handlerTracker struct {
25-
cancel context.CancelFunc
26-
updates chan<- *sdk.ScalingPolicy
26+
policyType string
27+
cancel context.CancelFunc
28+
updates chan<- *sdk.ScalingPolicy
2729
}
2830

2931
type targetGetter interface {
@@ -250,8 +252,9 @@ func (m *Manager) processMessageAndUpdateHandlers(ctx context.Context, message I
250252
upCh := make(chan *sdk.ScalingPolicy, 1)
251253

252254
nht := &handlerTracker{
253-
updates: upCh,
254-
cancel: handlerCancel,
255+
updates: upCh,
256+
cancel: handlerCancel,
257+
policyType: updatedPolicy.Type,
255258
}
256259

257260
m.log.Debug("creating new policy handler",
@@ -304,21 +307,13 @@ func (m *Manager) stopHandlers() {
304307
for source, handlers := range m.handlers {
305308
for id := range handlers {
306309
m.stopHandler(source, id)
310+
delete(handlers, id)
307311
}
312+
308313
delete(m.handlers, source)
309314
}
310315
}
311316

312-
func (m *Manager) addHandlerTracker(source SourceName, id PolicyID, nht *handlerTracker) {
313-
m.handlersLock.Lock()
314-
if m.handlers[source] == nil {
315-
m.handlers[source] = make(map[PolicyID]*handlerTracker)
316-
}
317-
318-
m.handlers[source][id] = nht
319-
m.handlersLock.Unlock()
320-
}
321-
322317
// stopHandler stops a handler and removes it from the manager's internal
323318
// state storage.
324319
//
@@ -332,6 +327,30 @@ func (m *Manager) stopHandler(source SourceName, id PolicyID) {
332327
close(ht.updates)
333328
}
334329

330+
func (m *Manager) StopHandlersByType(typesToStop ...string) {
331+
m.handlersLock.Lock()
332+
defer m.handlersLock.Unlock()
333+
334+
for source, handlers := range m.handlers {
335+
for id, tracker := range handlers {
336+
if slices.Contains(typesToStop, tracker.policyType) {
337+
m.stopHandler(source, id)
338+
delete(handlers, id)
339+
}
340+
}
341+
}
342+
}
343+
344+
func (m *Manager) addHandlerTracker(source SourceName, id PolicyID, nht *handlerTracker) {
345+
m.handlersLock.Lock()
346+
if m.handlers[source] == nil {
347+
m.handlers[source] = make(map[PolicyID]*handlerTracker)
348+
}
349+
350+
m.handlers[source][id] = nht
351+
m.handlersLock.Unlock()
352+
}
353+
335354
// ReloadSources triggers a reload of all the policy sources.
336355
func (m *Manager) ReloadSources() {
337356

0 commit comments

Comments
 (0)