From 1a3aa707a0de626968a6fa02421bbd369400bd4b Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 12 Aug 2025 10:28:08 +0200 Subject: [PATCH 01/21] style: pass all the workers configuration to the NewLimiter func --- agent/agent.go | 4 ++-- policy/manager.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 236ea114..56e5e2bf 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -74,12 +74,12 @@ func (a *Agent) Run(ctx context.Context) error { defer close(policyEvalCh) limiter := policy.NewLimiter(policy.DefaultLimiterTimeout, - a.config.PolicyEval.Workers["horizontal"], - a.config.PolicyEval.Workers["cluster"]) + a.config.PolicyEval.Workers) if err := a.setupPolicyManager(limiter); err != nil { return fmt.Errorf("failed to setup policy manager: %v", err) } + go a.policyManager.Run(ctx, policyEvalCh) a.initEnt(ctx, a.entReload) diff --git a/policy/manager.go b/policy/manager.go index fb5b1ca4..c30996a8 100644 --- a/policy/manager.go +++ b/policy/manager.go @@ -388,12 +388,12 @@ type Limiter struct { var ErrExecutionTimeout = errors.New("timeout while waiting for slot") -func NewLimiter(timeout time.Duration, hWorkersCount, cWorkersCount int) *Limiter { +func NewLimiter(timeout time.Duration, workersConfig map[string]int) *Limiter { return &Limiter{ timeout: timeout, slots: map[string]chan struct{}{ - "horizontal": make(chan struct{}, hWorkersCount), - "cluster": make(chan struct{}, cWorkersCount), + "horizontal": make(chan struct{}, workersConfig["horizontal"]), + "cluster": make(chan struct{}, workersConfig["cluster"]), }, } } From 12f20337e23848088063cfe82e34f3570cf87754 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 12 Aug 2025 14:04:08 +0200 Subject: [PATCH 02/21] func: add oss support for vertical policues --- policy/handler.go | 42 ++++++++++++++++++++++++++++++++++-------- policy/handler_oss.go | 24 ++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 8 deletions(-) create mode 100644 policy/handler_oss.go diff --git a/policy/handler.go b/policy/handler.go index af4c6ace..b93965a2 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -94,7 +94,10 @@ type Handler struct { cooldownLock sync.RWMutex outOfCooldownOn time.Time - pm dependencyGetter + pm dependencyGetter + calculateNewCount func(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) + minCount int64 + maxCount int64 } type HandlerConfig struct { @@ -124,9 +127,18 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { state: StateIdle, } - err := h.loadCheckRunners() - if err != nil { - return nil, fmt.Errorf("failed to load check handlers: %w", err) + switch config.Policy.Type { + case sdk.ScalingPolicyTypeCluster, sdk.ScalingPolicyTypeHorizontal: + err := h.configureHorizontalPolicy() + if err != nil { + return nil, fmt.Errorf("failed to configure horizontal policy: %w", err) + } + default: + err := h.configureVerticalPolicy() + if err != nil { + return nil, fmt.Errorf("failed to configure horizontal policy: %w", err) + } + } currentStatus, err := h.runTargetStatus() @@ -161,6 +173,19 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { return h, nil } +func (h *Handler) configureHorizontalPolicy() error { + h.minCount = h.policy.Min + h.maxCount = h.policy.Max + + err := h.loadCheckRunners() + if err != nil { + return fmt.Errorf("failed to load check handlers: %w", err) + } + + h.calculateNewCount = h.calculateHorizontalNewCount + return nil +} + func (h *Handler) loadCheckRunners() error { for _, check := range h.policy.Checks { @@ -244,7 +269,7 @@ func (h *Handler) Run(ctx context.Context) { // Canonicalize action so plugins don't have to. action.Canonicalize() - action.CapCount(h.policy.Min, h.policy.Max) + action.CapCount(h.minCount, h.maxCount) h.log.Info("calculating scaling target", "policy_id", h.policy.ID, "from", currentCount, "to", @@ -393,10 +418,11 @@ func (h *Handler) applyMutators(p *sdk.ScalingPolicy) { } } -// Handle policy is the main part of the controller, it reads the target state, +// calculateHorizontalNewCount is the main part of the controller, it // gets the metrics and the necessary new count to keep up with the policy -// and generates a scaling action if needed. -func (h *Handler) calculateNewCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { +// and generates a scaling action if needed, but only for horizontal policies: +// horizontal app and horizontal cluster scaling policies. +func (h *Handler) calculateHorizontalNewCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { h.log.Debug("received policy for evaluation") // Record the start time of the eval portion of this function. The labels diff --git a/policy/handler_oss.go b/policy/handler_oss.go new file mode 100644 index 00000000..e8fb733e --- /dev/null +++ b/policy/handler_oss.go @@ -0,0 +1,24 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +//go:build !ent +// +build !ent + +package policy + +import ( + "context" + + "github.com/hashicorp/nomad-autoscaler/sdk" +) + +func (h *Handler) configureVerticalPolicy() error { + + h.calculateNewCount = func(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { + return sdk.ScalingAction{ + Count: currentCount, + Reason: "Vertical scaling is not supported in OSS mode", + }, nil + } + return nil +} From c0ebf91201654e4821288d22f775f6bd624f1a25 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 12 Aug 2025 14:37:24 +0200 Subject: [PATCH 03/21] fix: udate all values of the handler on policy update --- policy/handler.go | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/policy/handler.go b/policy/handler.go index b93965a2..bfb54d08 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -136,9 +136,8 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { default: err := h.configureVerticalPolicy() if err != nil { - return nil, fmt.Errorf("failed to configure horizontal policy: %w", err) + return nil, fmt.Errorf("failed to configure vertical policy: %w", err) } - } currentStatus, err := h.runTargetStatus() @@ -174,19 +173,21 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { } func (h *Handler) configureHorizontalPolicy() error { - h.minCount = h.policy.Min - h.maxCount = h.policy.Max err := h.loadCheckRunners() if err != nil { return fmt.Errorf("failed to load check handlers: %w", err) } + h.minCount = h.policy.Min + h.maxCount = h.policy.Max + h.calculateNewCount = h.calculateHorizontalNewCount return nil } func (h *Handler) loadCheckRunners() error { + runners := []*checkRunner{} for _, check := range h.policy.Checks { s, err := h.pm.GetStrategyRunner(check.Strategy.Name) @@ -207,9 +208,10 @@ func (h *Handler) loadCheckRunners() error { Policy: h.policy, }, check) - h.checkRunners = append(h.checkRunners, runner) + runners = append(runners, runner) } + h.checkRunners = runners return nil } @@ -393,19 +395,26 @@ func (h *Handler) updateHandler(updatedPolicy *sdk.ScalingPolicy) { h.policyLock.Lock() defer h.policyLock.Unlock() - h.log.Debug("updating check handlers", "old_checks", len(h.policy.Checks), "new_checks", len(updatedPolicy.Checks)) - - // Clear existing check handlers and load new ones. - h.checkRunners = nil + switch updatedPolicy.Type { + case sdk.ScalingPolicyTypeCluster, sdk.ScalingPolicyTypeHorizontal: + h.log.Debug("updating check handlers", "old_checks", len(h.policy.Checks), + "new_checks", len(updatedPolicy.Checks)) + err := h.configureHorizontalPolicy() + if err != nil { + h.errChn <- fmt.Errorf("unable to update horizontal policy: %w", err) + return + } - err := h.loadCheckRunners() - if err != nil { - h.errChn <- fmt.Errorf("unable to update policy, failed to load check handlers: %w", err) - return + default: + err := h.configureVerticalPolicy() + if err != nil { + h.errChn <- fmt.Errorf("unable to update vertical policy: %w", err) + } } - h.log.Debug("check handlers updated", "count", len(h.checkRunners)) + h.log.Debug("policy successfully updated") h.policy = updatedPolicy + } // applyMutators applies the mutators registered with the handler in order and From d01f41ea2099dbf5f90ff9aac5c00b67fd331c5e Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 12 Aug 2025 15:20:43 +0200 Subject: [PATCH 04/21] func: update integration handler tests --- policy/handler.go | 22 ++++++++++++---------- policy/handler_oss.go | 4 ++++ policy/handler_test.go | 2 +- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/policy/handler.go b/policy/handler.go index bfb54d08..6322824e 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -173,14 +173,17 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { } func (h *Handler) configureHorizontalPolicy() error { + return h.updateHorizontalPolicy(h.policy) +} +func (h *Handler) updateHorizontalPolicy(up *sdk.ScalingPolicy) error { err := h.loadCheckRunners() if err != nil { return fmt.Errorf("failed to load check handlers: %w", err) } - h.minCount = h.policy.Min - h.maxCount = h.policy.Max + h.minCount = up.Min + h.maxCount = up.Max h.calculateNewCount = h.calculateHorizontalNewCount return nil @@ -211,6 +214,7 @@ func (h *Handler) loadCheckRunners() error { runners = append(runners, runner) } + // Do the update as a single operation to avoid partial updates. h.checkRunners = runners return nil } @@ -397,24 +401,22 @@ func (h *Handler) updateHandler(updatedPolicy *sdk.ScalingPolicy) { switch updatedPolicy.Type { case sdk.ScalingPolicyTypeCluster, sdk.ScalingPolicyTypeHorizontal: - h.log.Debug("updating check handlers", "old_checks", len(h.policy.Checks), - "new_checks", len(updatedPolicy.Checks)) - err := h.configureHorizontalPolicy() + err := h.updateHorizontalPolicy(updatedPolicy) if err != nil { - h.errChn <- fmt.Errorf("unable to update horizontal policy: %w", err) + h.errChn <- fmt.Errorf("unable to update horizontal policy %w", err) return } default: - err := h.configureVerticalPolicy() + err := h.updateVerticalPolicy(updatedPolicy) if err != nil { - h.errChn <- fmt.Errorf("unable to update vertical policy: %w", err) + h.errChn <- fmt.Errorf("unable to update vertical policy %w", err) + return } } - h.log.Debug("policy successfully updated") h.policy = updatedPolicy - + h.log.Debug("policy successfully updated") } // applyMutators applies the mutators registered with the handler in order and diff --git a/policy/handler_oss.go b/policy/handler_oss.go index e8fb733e..f5841e7a 100644 --- a/policy/handler_oss.go +++ b/policy/handler_oss.go @@ -22,3 +22,7 @@ func (h *Handler) configureVerticalPolicy() error { } return nil } + +func (h *Handler) updateVerticalPolicy(up *sdk.ScalingPolicy) error { + return h.configureVerticalPolicy() +} diff --git a/policy/handler_test.go b/policy/handler_test.go index 54e54fd0..fd577eef 100644 --- a/policy/handler_test.go +++ b/policy/handler_test.go @@ -467,7 +467,7 @@ func TestHandler_Run_ScalingNotNeeded_Integration(t *testing.T) { pm: mdg, } - must.NoError(t, handler.loadCheckRunners()) + must.NoError(t, handler.configureHorizontalPolicy()) go handler.Run(ctx) time.Sleep(30 * time.Millisecond) From e96c2a6292e8389239a7a982e566d407ade2b413 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 12 Aug 2025 17:19:55 +0200 Subject: [PATCH 05/21] func: add the possibilty to stop stop handlers by type --- policy/handler_test.go | 2 +- policy/manager.go | 47 +++++++++++++++++++++++++++++------------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/policy/handler_test.go b/policy/handler_test.go index fd577eef..5a3fd7c7 100644 --- a/policy/handler_test.go +++ b/policy/handler_test.go @@ -530,7 +530,7 @@ func TestHandler_Run_ScalingNeededAndCooldown_Integration(t *testing.T) { limiter: ml, } - must.NoError(t, handler.loadCheckRunners()) + must.NoError(t, handler.configureHorizontalPolicy()) go handler.Run(ctx) time.Sleep(30 * time.Millisecond) diff --git a/policy/manager.go b/policy/manager.go index c30996a8..c7469998 100644 --- a/policy/manager.go +++ b/policy/manager.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "maps" + "slices" "strings" "sync" "time" @@ -22,8 +23,9 @@ import ( const DefaultLimiterTimeout = 2 * time.Minute type handlerTracker struct { - cancel context.CancelFunc - updates chan<- *sdk.ScalingPolicy + policyType string + cancel context.CancelFunc + updates chan<- *sdk.ScalingPolicy } type targetGetter interface { @@ -250,8 +252,9 @@ func (m *Manager) processMessageAndUpdateHandlers(ctx context.Context, message I upCh := make(chan *sdk.ScalingPolicy, 1) nht := &handlerTracker{ - updates: upCh, - cancel: handlerCancel, + updates: upCh, + cancel: handlerCancel, + policyType: updatedPolicy.Type, } m.log.Debug("creating new policy handler", @@ -304,21 +307,13 @@ func (m *Manager) stopHandlers() { for source, handlers := range m.handlers { for id := range handlers { m.stopHandler(source, id) + delete(handlers, id) } + delete(m.handlers, source) } } -func (m *Manager) addHandlerTracker(source SourceName, id PolicyID, nht *handlerTracker) { - m.handlersLock.Lock() - if m.handlers[source] == nil { - m.handlers[source] = make(map[PolicyID]*handlerTracker) - } - - m.handlers[source][id] = nht - m.handlersLock.Unlock() -} - // stopHandler stops a handler and removes it from the manager's internal // state storage. // @@ -332,6 +327,30 @@ func (m *Manager) stopHandler(source SourceName, id PolicyID) { close(ht.updates) } +func (m *Manager) StopHandlersByType(typesToStop ...string) { + m.handlersLock.Lock() + defer m.handlersLock.Unlock() + + for source, handlers := range m.handlers { + for id, tracker := range handlers { + if slices.Contains(typesToStop, tracker.policyType) { + m.stopHandler(source, id) + delete(handlers, id) + } + } + } +} + +func (m *Manager) addHandlerTracker(source SourceName, id PolicyID, nht *handlerTracker) { + m.handlersLock.Lock() + if m.handlers[source] == nil { + m.handlers[source] = make(map[PolicyID]*handlerTracker) + } + + m.handlers[source][id] = nht + m.handlersLock.Unlock() +} + // ReloadSources triggers a reload of all the policy sources. func (m *Manager) ReloadSources() { From 96cdf2edd01556d9033133dc0536b90816ec7e70 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 13 Aug 2025 11:22:47 +0200 Subject: [PATCH 06/21] fix: update integration tests to load the correct count function --- policy/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/policy/handler_test.go b/policy/handler_test.go index 5a3fd7c7..f7baa154 100644 --- a/policy/handler_test.go +++ b/policy/handler_test.go @@ -686,7 +686,7 @@ func TestHandler_Run_StateChanges_Integration(t *testing.T) { nextAction: sdk.ScalingAction{}, } - must.NoError(t, handler.loadCheckRunners()) + must.NoError(t, handler.configureHorizontalPolicy()) go handler.Run(ctx) time.Sleep(30 * time.Millisecond) From 519a63b322d989d42921fe83117b968d534feae3 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 13 Aug 2025 16:36:06 +0200 Subject: [PATCH 07/21] func: separate the manager configuration --- agent/agent_oss.go | 51 +++++++++++++++++++++++++++++++++++++++++++++- policy/manager.go | 12 +++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/agent/agent_oss.go b/agent/agent_oss.go index c03dc3c3..1bd7b84d 100644 --- a/agent/agent_oss.go +++ b/agent/agent_oss.go @@ -6,7 +6,56 @@ package agent -import "context" +import ( + "context" + "errors" + + "github.com/hashicorp/nomad-autoscaler/policy" + filePolicy "github.com/hashicorp/nomad-autoscaler/policy/file" + nomadPolicy "github.com/hashicorp/nomad-autoscaler/policy/nomad" +) + +func (a *Agent) setupPolicyManager(limiter *policy.Limiter) error { + + // Create our processor, a shared method for performing basic policy + // actions. + cfgDefaults := policy.ConfigDefaults{ + DefaultEvaluationInterval: a.config.Policy.DefaultEvaluationInterval, + DefaultCooldown: a.config.Policy.DefaultCooldown, + } + policyProcessor := policy.NewProcessor(&cfgDefaults, a.getNomadAPMNames()) + + // Setup our initial default policy source which is Nomad. + sources := map[policy.SourceName]policy.Source{} + for _, s := range a.config.Policy.Sources { + if s.Enabled == nil || !*s.Enabled { + continue + } + + switch policy.SourceName(s.Name) { + case policy.SourceNameNomad: + sources[policy.SourceNameNomad] = nomadPolicy.NewNomadSource(a.logger, a.NomadClient, policyProcessor) + case policy.SourceNameFile: + // Only setup the file source if operators have configured a + // scaling policy directory to read from. + if a.config.Policy.Dir != "" { + sources[policy.SourceNameFile] = filePolicy.NewFileSource(a.logger, a.config.Policy.Dir, policyProcessor) + } + } + } + + // TODO: Once full policy source reload is implemented this should probably + // be just a warning. + if len(sources) == 0 { + return errors.New("no policy source available") + } + + a.policySources = sources + a.policyManager = policy.NewManager(a.logger, a.policySources, + a.pluginManager, a.config.Telemetry.CollectionInterval, limiter) + + return nil +} func (a *Agent) initEnt(ctx context.Context, reload <-chan any) { go func() { diff --git a/policy/manager.go b/policy/manager.go index c7469998..e35e5331 100644 --- a/policy/manager.go +++ b/policy/manager.go @@ -62,6 +62,10 @@ type Manager struct { *Limiter pluginManager dependencyGetter + + // Ent only fields + evaluateAfter time.Duration + historicalAPMGetter historicalAPMGetter } // NewManager returns a new Manager. @@ -360,6 +364,10 @@ func (m *Manager) ReloadSources() { } } +func (m *Manager) SetEvaluateAfter(ea time.Duration) { + m.evaluateAfter = ea +} + // periodicMetricsReporter periodically emits metrics for the policy manager // which cannot be performed during inline function calls. func (m *Manager) periodicMetricsReporter(ctx context.Context, interval time.Duration) { @@ -417,6 +425,10 @@ func NewLimiter(timeout time.Duration, workersConfig map[string]int) *Limiter { } } +func (l *Limiter) AddQueue(queue string, size int) { + l.slots[queue] = make(chan struct{}, size) +} + func (l *Limiter) QueueSize(queue string) int { return len(l.slots[queue]) } From 1d24c90701fc19a96fcf306f367646f7b99ff833 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 13 Aug 2025 16:47:58 +0200 Subject: [PATCH 08/21] func: add handler_oss --- policy/handler.go | 24 +++++++++++++----------- policy/handler_oss.go | 2 ++ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/policy/handler.go b/policy/handler.go index 6322824e..2eb85580 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -94,20 +94,22 @@ type Handler struct { cooldownLock sync.RWMutex outOfCooldownOn time.Time - pm dependencyGetter - calculateNewCount func(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) - minCount int64 - maxCount int64 + pm dependencyGetter + historicalAPMGetter historicalAPMGetter + calculateNewCount func(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) + minCount int64 + maxCount int64 } type HandlerConfig struct { - UpdatesChan chan *sdk.ScalingPolicy - ErrChan chan<- error - Policy *sdk.ScalingPolicy - Log hclog.Logger - TargetController targetpkg.Controller - Limiter *Limiter - DependencyGetter dependencyGetter + UpdatesChan chan *sdk.ScalingPolicy + ErrChan chan<- error + Policy *sdk.ScalingPolicy + Log hclog.Logger + TargetController targetpkg.Controller + Limiter *Limiter + DependencyGetter dependencyGetter + HistoricalAPMGetter historicalAPMGetter } func NewPolicyHandler(config HandlerConfig) (*Handler, error) { diff --git a/policy/handler_oss.go b/policy/handler_oss.go index f5841e7a..4dc8b3f1 100644 --- a/policy/handler_oss.go +++ b/policy/handler_oss.go @@ -12,6 +12,8 @@ import ( "github.com/hashicorp/nomad-autoscaler/sdk" ) +type historicalAPMGetter interface{} + func (h *Handler) configureVerticalPolicy() error { h.calculateNewCount = func(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { From bbcd6e5191d4129e5abe7390d8f21f4f5ae4ae5b Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 13 Aug 2025 16:58:17 +0200 Subject: [PATCH 09/21] func: create the ent policy types and use the const instead of teh string for teh old ones --- policy/manager.go | 4 ++-- policy/nomad/source.go | 4 ++-- policy/nomad/validate.go | 5 +++-- sdk/policy.go | 6 ++++-- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/policy/manager.go b/policy/manager.go index e35e5331..cff0242a 100644 --- a/policy/manager.go +++ b/policy/manager.go @@ -419,8 +419,8 @@ func NewLimiter(timeout time.Duration, workersConfig map[string]int) *Limiter { return &Limiter{ timeout: timeout, slots: map[string]chan struct{}{ - "horizontal": make(chan struct{}, workersConfig["horizontal"]), - "cluster": make(chan struct{}, workersConfig["cluster"]), + sdk.ScalingPolicyTypeHorizontal: make(chan struct{}, workersConfig["horizontal"]), + sdk.ScalingPolicyTypeCluster: make(chan struct{}, workersConfig["cluster"]), }, } } diff --git a/policy/nomad/source.go b/policy/nomad/source.go index ddb24494..82ed5c67 100644 --- a/policy/nomad/source.go +++ b/policy/nomad/source.go @@ -254,9 +254,9 @@ func (s *Source) canonicalizePolicy(p *sdk.ScalingPolicy) { func (s *Source) canonicalizePolicyByType(p *sdk.ScalingPolicy) { switch p.Type { - case "horizontal": + case sdk.ScalingPolicyTypeHorizontal: s.canonicalizeHorizontalPolicy(p) - case "cluster": + case sdk.ScalingPolicyTypeCluster: // Nothing to do for now. default: s.canonicalizeAdditionalTypes(p) diff --git a/policy/nomad/validate.go b/policy/nomad/validate.go index ba4cb120..22be0ff7 100644 --- a/policy/nomad/validate.go +++ b/policy/nomad/validate.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad-autoscaler/plugins" + "github.com/hashicorp/nomad-autoscaler/sdk" "github.com/hashicorp/nomad-autoscaler/sdk/helper/ptr" "github.com/hashicorp/nomad/api" ) @@ -54,9 +55,9 @@ func validateScalingPolicy(policy *api.ScalingPolicy) error { func validateScalingPolicyByType(policy *api.ScalingPolicy) error { switch policy.Type { - case "horizontal", "": + case sdk.ScalingPolicyTypeHorizontal, "": return validateHorizontalPolicy(policy) - case "cluster": + case sdk.ScalingPolicyTypeCluster: return validateClusterPolicy(policy) default: return additionalPolicyTypeValidation(policy) diff --git a/sdk/policy.go b/sdk/policy.go index 43dabc79..6612f5fb 100644 --- a/sdk/policy.go +++ b/sdk/policy.go @@ -13,8 +13,10 @@ import ( ) const ( - ScalingPolicyTypeCluster = "cluster" - ScalingPolicyTypeHorizontal = "horizontal" + ScalingPolicyTypeCluster = "cluster" + ScalingPolicyTypeHorizontal = "horizontal" + ScalingPolicyTypeVerticalCPU = "vertical_cpu" + ScalingPolicyTypeVerticalMem = "vertical_mem" ScalingPolicyOnErrorFail = "fail" ScalingPolicyOnErrorIgnore = "ignore" From 271f7cf4cfb56dbf7953c0887d693f7725d64223 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 13 Aug 2025 17:07:13 +0200 Subject: [PATCH 10/21] func: update the agent to pick up different policy manager configs --- agent/agent.go | 44 -------------------------------------------- policy/handler.go | 17 +++++++++-------- 2 files changed, 9 insertions(+), 52 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 56e5e2bf..87c3eefc 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -5,7 +5,6 @@ package agent import ( "context" - "errors" "fmt" "os" "os/signal" @@ -16,7 +15,6 @@ import ( "github.com/hashicorp/nomad-autoscaler/agent/config" "github.com/hashicorp/nomad-autoscaler/plugins/manager" "github.com/hashicorp/nomad-autoscaler/policy" - filePolicy "github.com/hashicorp/nomad-autoscaler/policy/file" nomadPolicy "github.com/hashicorp/nomad-autoscaler/policy/nomad" "github.com/hashicorp/nomad-autoscaler/sdk" nomadHelper "github.com/hashicorp/nomad-autoscaler/sdk/helper/nomad" @@ -89,48 +87,6 @@ func (a *Agent) Run(ctx context.Context) error { return nil } -func (a *Agent) setupPolicyManager(limiter *policy.Limiter) error { - - // Create our processor, a shared method for performing basic policy - // actions. - cfgDefaults := policy.ConfigDefaults{ - DefaultEvaluationInterval: a.config.Policy.DefaultEvaluationInterval, - DefaultCooldown: a.config.Policy.DefaultCooldown, - } - policyProcessor := policy.NewProcessor(&cfgDefaults, a.getNomadAPMNames()) - - // Setup our initial default policy source which is Nomad. - sources := map[policy.SourceName]policy.Source{} - for _, s := range a.config.Policy.Sources { - if s.Enabled == nil || !*s.Enabled { - continue - } - - switch policy.SourceName(s.Name) { - case policy.SourceNameNomad: - sources[policy.SourceNameNomad] = nomadPolicy.NewNomadSource(a.logger, a.NomadClient, policyProcessor) - case policy.SourceNameFile: - // Only setup the file source if operators have configured a - // scaling policy directory to read from. - if a.config.Policy.Dir != "" { - sources[policy.SourceNameFile] = filePolicy.NewFileSource(a.logger, a.config.Policy.Dir, policyProcessor) - } - } - } - - // TODO: Once full policy source reload is implemented this should probably - // be just a warning. - if len(sources) == 0 { - return errors.New("no policy source available") - } - - a.policySources = sources - a.policyManager = policy.NewManager(a.logger, a.policySources, - a.pluginManager, a.config.Telemetry.CollectionInterval, limiter) - - return nil -} - func (a *Agent) stop() { // Kill all the plugins. if a.pluginManager != nil { diff --git a/policy/handler.go b/policy/handler.go index 2eb85580..1db27eee 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -119,14 +119,15 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { mutators: []Mutator{ NomadAPMMutator{}, }, - pm: config.DependencyGetter, - targetController: config.TargetController, - updatesCh: config.UpdatesChan, - policy: config.Policy, - errChn: config.ErrChan, - limiter: config.Limiter, - stateLock: sync.RWMutex{}, - state: StateIdle, + pm: config.DependencyGetter, + historicalAPMGetter: config.HistoricalAPMGetter, + targetController: config.TargetController, + updatesCh: config.UpdatesChan, + policy: config.Policy, + errChn: config.ErrChan, + limiter: config.Limiter, + stateLock: sync.RWMutex{}, + state: StateIdle, } switch config.Policy.Type { From 86b2cd3fbc106b0c75e28ee9a1261e15d16e6520 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Thu, 14 Aug 2025 15:11:05 +0200 Subject: [PATCH 11/21] func: settle on one checker, not by type --- policy/check.go | 21 +++++++++++-- policy/check_test.go | 2 +- policy/handler.go | 68 ++++++++++++++++++++----------------------- policy/handler_oss.go | 13 ++------- policy/manager.go | 28 +++++++++++------- 5 files changed, 72 insertions(+), 60 deletions(-) diff --git a/policy/check.go b/policy/check.go index 1ec10e06..72df5d42 100644 --- a/policy/check.go +++ b/policy/check.go @@ -51,10 +51,10 @@ func NewCheckRunner(config *CheckRunnerConfig, c *sdk.ScalingPolicyCheck) *check } } -// GetNewCountFromStrategy begins the execution of the checks and returns +// getNewCountFromStrategy begins the execution of the checks and returns // and action containing the instance count after applying the strategy to the // metrics. -func (ch *checkRunner) GetNewCountFromStrategy(ctx context.Context, currentCount int64, +func (ch *checkRunner) getNewCountFromStrategy(ctx context.Context, currentCount int64, metrics sdk.TimestampedMetrics) (sdk.ScalingAction, error) { ch.log.Debug("calculating new count", "current count", currentCount) @@ -176,3 +176,20 @@ func (ch *checkRunner) QueryMetrics(ctx context.Context) (sdk.TimestampedMetrics return ms, nil } + +func (ch *checkRunner) RunCheck(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { + ch.log.Debug("received policy check for evaluation") + + metrics, err := ch.QueryMetrics(ctx) + if err != nil { + return sdk.ScalingAction{}, fmt.Errorf("failed to query source: %v", err) + } + + action, err := ch.getNewCountFromStrategy(ctx, currentCount, metrics) + if err != nil { + return sdk.ScalingAction{}, fmt.Errorf("failed get count from metrics: %v", err) + + } + + return action, nil +} diff --git a/policy/check_test.go b/policy/check_test.go index 78cfaf12..965a1e9e 100644 --- a/policy/check_test.go +++ b/policy/check_test.go @@ -154,7 +154,7 @@ func TestCheckHandler_getNewCountFromMetrics(t *testing.T) { Policy: tt.policy, }, ch) - action, err := runner.GetNewCountFromStrategy(context.Background(), 3, tt.metrics) + action, err := runner.getNewCountFromStrategy(context.Background(), 3, tt.metrics) must.Eq(t, tt.expectedAction, action) must.Eq(t, tt.expError, errors.Unwrap(err)) diff --git a/policy/handler.go b/policy/handler.go index 1db27eee..61135be6 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -94,11 +94,13 @@ type Handler struct { cooldownLock sync.RWMutex outOfCooldownOn time.Time - pm dependencyGetter - historicalAPMGetter historicalAPMGetter - calculateNewCount func(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) - minCount int64 - maxCount int64 + pm dependencyGetter + minCount int64 + maxCount int64 + + // Ent only field + evaluateAfter time.Duration + historicalAPMGetter HistoricalAPMGetter } type HandlerConfig struct { @@ -109,7 +111,8 @@ type HandlerConfig struct { TargetController targetpkg.Controller Limiter *Limiter DependencyGetter dependencyGetter - HistoricalAPMGetter historicalAPMGetter + HistoricalAPMGetter HistoricalAPMGetter + EvaluateAfter time.Duration } func NewPolicyHandler(config HandlerConfig) (*Handler, error) { @@ -121,6 +124,7 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { }, pm: config.DependencyGetter, historicalAPMGetter: config.HistoricalAPMGetter, + evaluateAfter: config.EvaluateAfter, targetController: config.TargetController, updatesCh: config.UpdatesChan, policy: config.Policy, @@ -175,6 +179,23 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { return h, nil } +// Convert the last event string. +func checkForOutOfBandEvents(status *sdk.TargetStatus) (int64, error) { + // If the target status includes a last event meta key, check for cooldown + // due to out-of-band events. This is also useful if the Autoscaler has + // been re-deployed. + if status.Meta == nil { + return 0, nil + } + + ts, ok := status.Meta[sdk.TargetStatusMetaKeyLastEvent] + if !ok { + return 0, nil + } + + return strconv.ParseInt(ts, 10, 64) +} + func (h *Handler) configureHorizontalPolicy() error { return h.updateHorizontalPolicy(h.policy) } @@ -188,7 +209,6 @@ func (h *Handler) updateHorizontalPolicy(up *sdk.ScalingPolicy) error { h.minCount = up.Min h.maxCount = up.Max - h.calculateNewCount = h.calculateHorizontalNewCount return nil } @@ -219,6 +239,7 @@ func (h *Handler) loadCheckRunners() error { // Do the update as a single operation to avoid partial updates. h.checkRunners = runners + return nil } @@ -364,23 +385,6 @@ func (h *Handler) waitAndScale(ctx context.Context) error { return nil } -// Convert the last event string. -func checkForOutOfBandEvents(status *sdk.TargetStatus) (int64, error) { - // If the target status includes a last event meta key, check for cooldown - // due to out-of-band events. This is also useful if the Autoscaler has - // been re-deployed. - if status.Meta == nil { - return 0, nil - } - - ts, ok := status.Meta[sdk.TargetStatusMetaKeyLastEvent] - if !ok { - return 0, nil - } - - return strconv.ParseInt(ts, 10, 64) -} - // updateHandler updates the handler's internal state based on the changes in // the policy being monitored. func (h *Handler) updateHandler(updatedPolicy *sdk.ScalingPolicy) { @@ -436,7 +440,7 @@ func (h *Handler) applyMutators(p *sdk.ScalingPolicy) { // gets the metrics and the necessary new count to keep up with the policy // and generates a scaling action if needed, but only for horizontal policies: // horizontal app and horizontal cluster scaling policies. -func (h *Handler) calculateHorizontalNewCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { +func (h *Handler) calculateNewCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { h.log.Debug("received policy for evaluation") // Record the start time of the eval portion of this function. The labels @@ -451,24 +455,16 @@ func (h *Handler) calculateHorizontalNewCount(ctx context.Context, currentCount checkGroups := make(map[string][]checkResult) for _, ch := range h.checkRunners { - h.log.Debug("received policy check for evaluation") - - metrics, err := ch.QueryMetrics(ctx) - if err != nil { - return sdk.ScalingAction{}, fmt.Errorf("failed to query source: %v", err) - } - - action, err := ch.GetNewCountFromStrategy(ctx, currentCount, metrics) + action, err := ch.RunCheck(ctx, currentCount) if err != nil { return sdk.ScalingAction{}, fmt.Errorf("failed get count from metrics: %v", err) } - group := ch.check.Group - checkGroups[group] = append(checkGroups[group], checkResult{ + checkGroups[ch.check.Group] = append(checkGroups[ch.check.Group], checkResult{ action: &action, handler: ch, - group: group, + group: ch.check.Group, }) } diff --git a/policy/handler_oss.go b/policy/handler_oss.go index 4dc8b3f1..ec3575d1 100644 --- a/policy/handler_oss.go +++ b/policy/handler_oss.go @@ -7,21 +7,14 @@ package policy import ( - "context" - "github.com/hashicorp/nomad-autoscaler/sdk" ) -type historicalAPMGetter interface{} +type HistoricalAPMGetter interface{} -func (h *Handler) configureVerticalPolicy() error { +type noopHistoricalAPMGetter struct{} - h.calculateNewCount = func(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { - return sdk.ScalingAction{ - Count: currentCount, - Reason: "Vertical scaling is not supported in OSS mode", - }, nil - } +func (h *Handler) configureVerticalPolicy() error { return nil } diff --git a/policy/manager.go b/policy/manager.go index cff0242a..a0b9ae02 100644 --- a/policy/manager.go +++ b/policy/manager.go @@ -65,23 +65,25 @@ type Manager struct { // Ent only fields evaluateAfter time.Duration - historicalAPMGetter historicalAPMGetter + historicalAPMGetter HistoricalAPMGetter } // NewManager returns a new Manager. func NewManager(log hclog.Logger, ps map[SourceName]Source, pm *manager.PluginManager, mInt time.Duration, l *Limiter) *Manager { return &Manager{ - log: log.ResetNamed("policy_manager"), - policySources: ps, - targetGetter: pm, - handlersLock: sync.RWMutex{}, - handlers: make(map[SourceName]map[PolicyID]*handlerTracker), - metricsInterval: mInt, - policyIDsCh: make(chan IDMessage, 2), - policyIDsErrCh: make(chan error, 2), - Limiter: l, - pluginManager: pm, + log: log.ResetNamed("policy_manager"), + policySources: ps, + targetGetter: pm, + handlersLock: sync.RWMutex{}, + handlers: make(map[SourceName]map[PolicyID]*handlerTracker), + metricsInterval: mInt, + policyIDsCh: make(chan IDMessage, 2), + policyIDsErrCh: make(chan error, 2), + Limiter: l, + pluginManager: pm, + evaluateAfter: 0, + historicalAPMGetter: &noopHistoricalAPMGetter{}, } } @@ -368,6 +370,10 @@ func (m *Manager) SetEvaluateAfter(ea time.Duration) { m.evaluateAfter = ea } +func (m *Manager) SetHistoricalAPMGetter(hag HistoricalAPMGetter) { + m.historicalAPMGetter = hag +} + // periodicMetricsReporter periodically emits metrics for the policy manager // which cannot be performed during inline function calls. func (m *Manager) periodicMetricsReporter(ctx context.Context, interval time.Duration) { From 4875e70d0300578fa9f8d076ef8799a49d70ab1b Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Thu, 14 Aug 2025 17:42:46 +0200 Subject: [PATCH 12/21] func: create the vertical checker for both ent and oss --- policy/check.go | 12 +++-- policy/check_test.go | 2 +- policy/handler.go | 116 +++++++++++++++++------------------------ policy/handler_oss.go | 27 ++++++++-- policy/handler_test.go | 9 ++-- policy/manager.go | 24 ++++----- 6 files changed, 96 insertions(+), 94 deletions(-) diff --git a/policy/check.go b/policy/check.go index 72df5d42..c3b74ea1 100644 --- a/policy/check.go +++ b/policy/check.go @@ -132,7 +132,7 @@ func (ch *checkRunner) runStrategy(ctx context.Context, currentCount int64, ms s } // QueryMetrics wraps the apm.Query call to provide operational functionality. -func (ch *checkRunner) QueryMetrics(ctx context.Context) (sdk.TimestampedMetrics, error) { +func (ch *checkRunner) queryMetrics(ctx context.Context) (sdk.TimestampedMetrics, error) { ch.log.Debug("querying source", "query", ch.check.Query, "source", ch.check.Source) // Trigger a metric measure to track latency of the call. @@ -177,10 +177,14 @@ func (ch *checkRunner) QueryMetrics(ctx context.Context) (sdk.TimestampedMetrics return ms, nil } -func (ch *checkRunner) RunCheck(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { +func (ch *checkRunner) Group() string { + return ch.check.Group +} + +func (ch *checkRunner) RunCheckAndCapCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { ch.log.Debug("received policy check for evaluation") - metrics, err := ch.QueryMetrics(ctx) + metrics, err := ch.queryMetrics(ctx) if err != nil { return sdk.ScalingAction{}, fmt.Errorf("failed to query source: %v", err) } @@ -191,5 +195,7 @@ func (ch *checkRunner) RunCheck(ctx context.Context, currentCount int64) (sdk.Sc } + action.CapCount(ch.policy.Min, ch.policy.Max) + return action, nil } diff --git a/policy/check_test.go b/policy/check_test.go index 965a1e9e..ec7324e6 100644 --- a/policy/check_test.go +++ b/policy/check_test.go @@ -234,7 +234,7 @@ func TestCheckHandler_runAPMQuery(t *testing.T) { }, }, check) - result, err := handler.QueryMetrics(context.Background()) + result, err := handler.queryMetrics(context.Background()) must.Eq(t, tc.expResult, result) must.True(t, errors.Is(err, tc.expErr)) }) diff --git a/policy/handler.go b/policy/handler.go index 61135be6..81c3260f 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -60,6 +60,11 @@ type limiter interface { ReleaseSlot(p *sdk.ScalingPolicy) } +type checker interface { + RunCheckAndCapCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) + Group() string +} + // Handler monitors a policy for changes and controls when them are sent for // evaluation. type Handler struct { @@ -71,7 +76,7 @@ type Handler struct { policyLock sync.RWMutex policy *sdk.ScalingPolicy - checkRunners []*checkRunner + checkRunners []checker targetController targetpkg.Controller @@ -94,9 +99,7 @@ type Handler struct { cooldownLock sync.RWMutex outOfCooldownOn time.Time - pm dependencyGetter - minCount int64 - maxCount int64 + pm dependencyGetter // Ent only field evaluateAfter time.Duration @@ -134,18 +137,7 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { state: StateIdle, } - switch config.Policy.Type { - case sdk.ScalingPolicyTypeCluster, sdk.ScalingPolicyTypeHorizontal: - err := h.configureHorizontalPolicy() - if err != nil { - return nil, fmt.Errorf("failed to configure horizontal policy: %w", err) - } - default: - err := h.configureVerticalPolicy() - if err != nil { - return nil, fmt.Errorf("failed to configure vertical policy: %w", err) - } - } + h.loadCheckRunners() currentStatus, err := h.runTargetStatus() if err != nil { @@ -196,50 +188,46 @@ func checkForOutOfBandEvents(status *sdk.TargetStatus) (int64, error) { return strconv.ParseInt(ts, 10, 64) } -func (h *Handler) configureHorizontalPolicy() error { - return h.updateHorizontalPolicy(h.policy) -} +func (h *Handler) loadCheckRunners() error { + runners := []checker{} -func (h *Handler) updateHorizontalPolicy(up *sdk.ScalingPolicy) error { - err := h.loadCheckRunners() - if err != nil { - return fmt.Errorf("failed to load check handlers: %w", err) - } + switch h.policy.Type { + case sdk.ScalingPolicyTypeCluster, sdk.ScalingPolicyTypeHorizontal: + for _, check := range h.policy.Checks { - h.minCount = up.Min - h.maxCount = up.Max + s, err := h.pm.GetStrategyRunner(check.Strategy.Name) + if err != nil { + return fmt.Errorf("failed to get strategy %s: %w", check.Strategy.Name, err) + } - return nil -} + mg, err := h.pm.GetAPMLooker(check.Source) + if err != nil { + return fmt.Errorf("failed to get APM for strategy %s: %w", check.Strategy.Name, err) + } -func (h *Handler) loadCheckRunners() error { - runners := []*checkRunner{} + runner := NewCheckRunner(&CheckRunnerConfig{ + Log: h.log.Named("check_handler").With("check", check.Name, + "source", check.Source, "strategy", check.Strategy.Name), + StrategyRunner: s, + MetricsGetter: mg, + Policy: h.policy, + }, check) + + runners = append(runners, runner) - for _, check := range h.policy.Checks { - s, err := h.pm.GetStrategyRunner(check.Strategy.Name) - if err != nil { - return fmt.Errorf("failed to get strategy %s: %w", check.Strategy.Name, err) } - mg, err := h.pm.GetAPMLooker(check.Source) + case sdk.ScalingPolicyTypeVerticalCPU, sdk.ScalingPolicyTypeVerticalMem: + runner, err := h.loadVerticalCheckRunner() if err != nil { - return fmt.Errorf("failed to get APM for strategy %s: %w", check.Strategy.Name, err) + return fmt.Errorf("failed to load vertical check %s: %w", h.policy.Type, err) } - runner := NewCheckRunner(&CheckRunnerConfig{ - Log: h.log.Named("check_handler").With("check", check.Name, - "source", check.Source, "strategy", check.Strategy.Name), - StrategyRunner: s, - MetricsGetter: mg, - Policy: h.policy, - }, check) - runners = append(runners, runner) } // Do the update as a single operation to avoid partial updates. h.checkRunners = runners - return nil } @@ -297,10 +285,6 @@ func (h *Handler) Run(ctx context.Context) { continue } - // Canonicalize action so plugins don't have to. - action.Canonicalize() - action.CapCount(h.minCount, h.maxCount) - h.log.Info("calculating scaling target", "policy_id", h.policy.ID, "from", currentCount, "to", action.Count, "reason", action.Reason, "meta", action.Meta) @@ -406,24 +390,17 @@ func (h *Handler) updateHandler(updatedPolicy *sdk.ScalingPolicy) { h.policyLock.Lock() defer h.policyLock.Unlock() - switch updatedPolicy.Type { - case sdk.ScalingPolicyTypeCluster, sdk.ScalingPolicyTypeHorizontal: - err := h.updateHorizontalPolicy(updatedPolicy) - if err != nil { - h.errChn <- fmt.Errorf("unable to update horizontal policy %w", err) - return - } + // Clear existing check handlers and load new ones. + h.checkRunners = nil - default: - err := h.updateVerticalPolicy(updatedPolicy) - if err != nil { - h.errChn <- fmt.Errorf("unable to update vertical policy %w", err) - return - } + err := h.loadCheckRunners() + if err != nil { + h.errChn <- fmt.Errorf("unable to update policy, failed to load check handlers: %w", err) + return } + h.log.Debug("check handlers updated", "count", len(h.checkRunners)) h.policy = updatedPolicy - h.log.Debug("policy successfully updated") } // applyMutators applies the mutators registered with the handler in order and @@ -455,16 +432,17 @@ func (h *Handler) calculateNewCount(ctx context.Context, currentCount int64) (sd checkGroups := make(map[string][]checkResult) for _, ch := range h.checkRunners { - action, err := ch.RunCheck(ctx, currentCount) + action, err := ch.RunCheckAndCapCount(ctx, currentCount) if err != nil { return sdk.ScalingAction{}, fmt.Errorf("failed get count from metrics: %v", err) } - checkGroups[ch.check.Group] = append(checkGroups[ch.check.Group], checkResult{ + g := ch.Group() + checkGroups[g] = append(checkGroups[g], checkResult{ action: &action, handler: ch, - group: ch.check.Group, + group: g, }) } @@ -473,8 +451,8 @@ func (h *Handler) calculateNewCount(ctx context.Context, currentCount int64) (sd return sdk.ScalingAction{}, nil } - h.log.Debug("check selected", "name", winner.handler.check.Name, - "direction", winner.action.Direction, "count", winner.action.Count) + h.log.Debug("check selected", "direction", winner.action.Direction, + "count", winner.action.Count) // At this point the checks have finished. Therefore emit of metric data // tracking how long it takes to run all the checks within a policy. @@ -635,7 +613,7 @@ func calculateRemainingCooldown(cd time.Duration, ts, lastEvent int64) time.Dura type checkResult struct { action *sdk.ScalingAction - handler *checkRunner + handler checker group string } diff --git a/policy/handler_oss.go b/policy/handler_oss.go index ec3575d1..5bc29af4 100644 --- a/policy/handler_oss.go +++ b/policy/handler_oss.go @@ -7,6 +7,8 @@ package policy import ( + "context" + "github.com/hashicorp/nomad-autoscaler/sdk" ) @@ -14,10 +16,27 @@ type HistoricalAPMGetter interface{} type noopHistoricalAPMGetter struct{} -func (h *Handler) configureVerticalPolicy() error { - return nil +type noopVerticalCheckRunner struct { + policy *sdk.ScalingPolicy +} + +func (nv *noopVerticalCheckRunner) RunCheckAndCapCount(_ context.Context, currentCount int64) (sdk.ScalingAction, error) { + a := sdk.ScalingAction{ + Direction: sdk.ScaleDirectionNone, + Count: currentCount, + } + + a.CapCount(nv.policy.Min, nv.policy.Max) + + return a, nil +} + +func (nv *noopVerticalCheckRunner) Group() string { + return "" } -func (h *Handler) updateVerticalPolicy(up *sdk.ScalingPolicy) error { - return h.configureVerticalPolicy() +func (h *Handler) loadVerticalCheckRunner() (*noopVerticalCheckRunner, error) { + return &noopVerticalCheckRunner{ + policy: h.policy, + }, nil } diff --git a/policy/handler_test.go b/policy/handler_test.go index f7baa154..eb28afcb 100644 --- a/policy/handler_test.go +++ b/policy/handler_test.go @@ -290,7 +290,7 @@ func Test_pickWinnerActionFromGroups(t *testing.T) { must.Eq(t, tt.wantAction.Direction, result.action.Direction) must.Eq(t, tt.wantAction.Count, result.action.Count) must.NotNil(t, result.handler) - must.Eq(t, tt.wantHandler.check.Name, result.handler.check.Name) + //must.Eq(t, tt.wantHandler.check.Name, result.handler.check.Name) } }) } @@ -467,7 +467,7 @@ func TestHandler_Run_ScalingNotNeeded_Integration(t *testing.T) { pm: mdg, } - must.NoError(t, handler.configureHorizontalPolicy()) + must.NoError(t, handler.loadCheckRunners()) go handler.Run(ctx) time.Sleep(30 * time.Millisecond) @@ -530,7 +530,7 @@ func TestHandler_Run_ScalingNeededAndCooldown_Integration(t *testing.T) { limiter: ml, } - must.NoError(t, handler.configureHorizontalPolicy()) + must.NoError(t, handler.loadCheckRunners()) go handler.Run(ctx) time.Sleep(30 * time.Millisecond) @@ -686,7 +686,7 @@ func TestHandler_Run_StateChanges_Integration(t *testing.T) { nextAction: sdk.ScalingAction{}, } - must.NoError(t, handler.configureHorizontalPolicy()) + must.NoError(t, handler.loadCheckRunners()) go handler.Run(ctx) time.Sleep(30 * time.Millisecond) @@ -704,5 +704,4 @@ func TestHandler_Run_StateChanges_Integration(t *testing.T) { }, handler.getNextAction()) }) } - } diff --git a/policy/manager.go b/policy/manager.go index a0b9ae02..bcae6704 100644 --- a/policy/manager.go +++ b/policy/manager.go @@ -72,18 +72,18 @@ type Manager struct { func NewManager(log hclog.Logger, ps map[SourceName]Source, pm *manager.PluginManager, mInt time.Duration, l *Limiter) *Manager { return &Manager{ - log: log.ResetNamed("policy_manager"), - policySources: ps, - targetGetter: pm, - handlersLock: sync.RWMutex{}, - handlers: make(map[SourceName]map[PolicyID]*handlerTracker), - metricsInterval: mInt, - policyIDsCh: make(chan IDMessage, 2), - policyIDsErrCh: make(chan error, 2), - Limiter: l, - pluginManager: pm, - evaluateAfter: 0, - historicalAPMGetter: &noopHistoricalAPMGetter{}, + log: log.ResetNamed("policy_manager"), + policySources: ps, + targetGetter: pm, + handlersLock: sync.RWMutex{}, + handlers: make(map[SourceName]map[PolicyID]*handlerTracker), + metricsInterval: mInt, + policyIDsCh: make(chan IDMessage, 2), + policyIDsErrCh: make(chan error, 2), + Limiter: l, + pluginManager: pm, + evaluateAfter: 0, + //historicalAPMGetter: &noopHistoricalAPMGetter{}, } } From 3cc8e33d41c026bf40ec2858f3a83aa60c63d76d Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 15 Aug 2025 09:39:16 +0200 Subject: [PATCH 13/21] fix: add policy type to test and add validation for type plus fix validation for nil policy --- policy/handler_test.go | 1 + sdk/policy.go | 7 ++++++- sdk/policy_test.go | 9 ++++++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/policy/handler_test.go b/policy/handler_test.go index eb28afcb..722fc324 100644 --- a/policy/handler_test.go +++ b/policy/handler_test.go @@ -399,6 +399,7 @@ func TestHandler_Run_TargetNotReady_Integration(t *testing.T) { } var policy = &sdk.ScalingPolicy{ + Type: sdk.ScalingPolicyTypeHorizontal, ID: "test-policy", EvaluationInterval: 20 * time.Millisecond, Min: 1, diff --git a/sdk/policy.go b/sdk/policy.go index 6612f5fb..9e0569b3 100644 --- a/sdk/policy.go +++ b/sdk/policy.go @@ -4,6 +4,7 @@ package sdk import ( + "errors" "fmt" "strings" "time" @@ -86,11 +87,15 @@ type ScalingPolicy struct { // Validate applies validation rules that are independent of policy source. func (p *ScalingPolicy) Validate() error { if p == nil { - return nil + return errors.New("empty policy") } var result *multierror.Error + if p.Type == "" { + result = multierror.Append(result, errors.New("policy has not type defined")) + } + switch p.OnCheckError { case "", ScalingPolicyOnErrorFail, ScalingPolicyOnErrorIgnore: default: diff --git a/sdk/policy_test.go b/sdk/policy_test.go index 62a6f540..f8abfc85 100644 --- a/sdk/policy_test.go +++ b/sdk/policy_test.go @@ -19,7 +19,14 @@ func TestScalingPolicy_Validate(t *testing.T) { { name: "nil policy", policy: nil, - expectedError: "", + expectedError: "empty policy", + }, + { + name: "no type", + policy: &ScalingPolicy{ + Type: "", + }, + expectedError: "policy has not type defined", }, { name: "invalid on_check_error", From 3adf812458154a2a7ee87cd40959084392c85944 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 15 Aug 2025 10:30:39 +0200 Subject: [PATCH 14/21] func: add policy type to tests policies --- policy/manager_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/policy/manager_test.go b/policy/manager_test.go index 9b2dd6ff..920abaf3 100644 --- a/policy/manager_test.go +++ b/policy/manager_test.go @@ -121,6 +121,7 @@ func (ms *mockSource) ReloadIDsMonitor() var policy1 = &sdk.ScalingPolicy{ ID: "policy1", + Type: sdk.ScalingPolicyTypeCluster, Enabled: true, Checks: []*sdk.ScalingPolicyCheck{ { @@ -147,6 +148,7 @@ var policy1 = &sdk.ScalingPolicy{ var policy2 = &sdk.ScalingPolicy{ ID: "policy2", Enabled: true, + Type: sdk.ScalingPolicyTypeVerticalCPU, Checks: []*sdk.ScalingPolicyCheck{ { Name: "check1", @@ -164,6 +166,7 @@ var policy2 = &sdk.ScalingPolicy{ var policy3 = &sdk.ScalingPolicy{ ID: "policy3", + Type: sdk.ScalingPolicyTypeHorizontal, Enabled: true, Checks: []*sdk.ScalingPolicyCheck{ { @@ -552,7 +555,8 @@ func TestProcessMessageAndUpdateHandlers_GetTargetReporterError(t *testing.T) { name: "mock-source", latestVersion: map[PolicyID]*sdk.ScalingPolicy{ "policy1": { - ID: "policy1", + ID: "policy1", + Type: sdk.ScalingPolicyTypeCluster, Checks: []*sdk.ScalingPolicyCheck{ { Name: "check1", @@ -563,7 +567,8 @@ func TestProcessMessageAndUpdateHandlers_GetTargetReporterError(t *testing.T) { }, }, "policy2": { - ID: "policy2", + ID: "policy2", + Type: sdk.ScalingPolicyTypeHorizontal, Checks: []*sdk.ScalingPolicyCheck{ { Name: "check1", @@ -574,7 +579,8 @@ func TestProcessMessageAndUpdateHandlers_GetTargetReporterError(t *testing.T) { }, }, "policy3": { - ID: "policy3", + ID: "policy3", + Type: sdk.ScalingPolicyTypeVerticalCPU, Checks: []*sdk.ScalingPolicyCheck{ { Name: "check1", From 28d22f7efe5025f38a7aed7088fc72bf39b831b1 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 15 Aug 2025 10:37:13 +0200 Subject: [PATCH 15/21] fix: check and report error on loading the check runners --- policy/handler.go | 5 ++++- policy/handler_oss.go | 2 -- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/policy/handler.go b/policy/handler.go index 81c3260f..c617773f 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -137,7 +137,10 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { state: StateIdle, } - h.loadCheckRunners() + err := h.loadCheckRunners() + if err != nil { + return nil, fmt.Errorf("unable to load the checks for the handler: %w", err) + } currentStatus, err := h.runTargetStatus() if err != nil { diff --git a/policy/handler_oss.go b/policy/handler_oss.go index 5bc29af4..b1b447fd 100644 --- a/policy/handler_oss.go +++ b/policy/handler_oss.go @@ -14,8 +14,6 @@ import ( type HistoricalAPMGetter interface{} -type noopHistoricalAPMGetter struct{} - type noopVerticalCheckRunner struct { policy *sdk.ScalingPolicy } From 2633787a5f0af55b0c966128895bb2ff3a4c9019 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 15 Aug 2025 11:12:17 +0200 Subject: [PATCH 16/21] func: put the policy manager init back to common code --- agent/agent.go | 44 +++++++++++++++++++++++++++++++++++++++++++ agent/agent_oss.go | 47 ---------------------------------------------- 2 files changed, 44 insertions(+), 47 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 87c3eefc..84dc6a18 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -5,6 +5,7 @@ package agent import ( "context" + "errors" "fmt" "os" "os/signal" @@ -15,6 +16,7 @@ import ( "github.com/hashicorp/nomad-autoscaler/agent/config" "github.com/hashicorp/nomad-autoscaler/plugins/manager" "github.com/hashicorp/nomad-autoscaler/policy" + filePolicy "github.com/hashicorp/nomad-autoscaler/policy/file" nomadPolicy "github.com/hashicorp/nomad-autoscaler/policy/nomad" "github.com/hashicorp/nomad-autoscaler/sdk" nomadHelper "github.com/hashicorp/nomad-autoscaler/sdk/helper/nomad" @@ -145,6 +147,48 @@ func (a *Agent) reload() { } } +func (a *Agent) setupPolicyManager(limiter *policy.Limiter) error { + + // Create our processor, a shared method for performing basic policy + // actions. + cfgDefaults := policy.ConfigDefaults{ + DefaultEvaluationInterval: a.config.Policy.DefaultEvaluationInterval, + DefaultCooldown: a.config.Policy.DefaultCooldown, + } + policyProcessor := policy.NewProcessor(&cfgDefaults, a.getNomadAPMNames()) + + // Setup our initial default policy source which is Nomad. + sources := map[policy.SourceName]policy.Source{} + for _, s := range a.config.Policy.Sources { + if s.Enabled == nil || !*s.Enabled { + continue + } + + switch policy.SourceName(s.Name) { + case policy.SourceNameNomad: + sources[policy.SourceNameNomad] = nomadPolicy.NewNomadSource(a.logger, a.NomadClient, policyProcessor) + case policy.SourceNameFile: + // Only setup the file source if operators have configured a + // scaling policy directory to read from. + if a.config.Policy.Dir != "" { + sources[policy.SourceNameFile] = filePolicy.NewFileSource(a.logger, a.config.Policy.Dir, policyProcessor) + } + } + } + + // TODO: Once full policy source reload is implemented this should probably + // be just a warning. + if len(sources) == 0 { + return errors.New("no policy source available") + } + + a.policySources = sources + a.policyManager = policy.NewManager(a.logger, a.policySources, + a.pluginManager, a.config.Telemetry.CollectionInterval, limiter) + + return nil +} + // handleSignals blocks until the agent receives an exit signal. func (a *Agent) handleSignals() { diff --git a/agent/agent_oss.go b/agent/agent_oss.go index 1bd7b84d..1c280b6d 100644 --- a/agent/agent_oss.go +++ b/agent/agent_oss.go @@ -8,55 +8,8 @@ package agent import ( "context" - "errors" - - "github.com/hashicorp/nomad-autoscaler/policy" - filePolicy "github.com/hashicorp/nomad-autoscaler/policy/file" - nomadPolicy "github.com/hashicorp/nomad-autoscaler/policy/nomad" ) -func (a *Agent) setupPolicyManager(limiter *policy.Limiter) error { - - // Create our processor, a shared method for performing basic policy - // actions. - cfgDefaults := policy.ConfigDefaults{ - DefaultEvaluationInterval: a.config.Policy.DefaultEvaluationInterval, - DefaultCooldown: a.config.Policy.DefaultCooldown, - } - policyProcessor := policy.NewProcessor(&cfgDefaults, a.getNomadAPMNames()) - - // Setup our initial default policy source which is Nomad. - sources := map[policy.SourceName]policy.Source{} - for _, s := range a.config.Policy.Sources { - if s.Enabled == nil || !*s.Enabled { - continue - } - - switch policy.SourceName(s.Name) { - case policy.SourceNameNomad: - sources[policy.SourceNameNomad] = nomadPolicy.NewNomadSource(a.logger, a.NomadClient, policyProcessor) - case policy.SourceNameFile: - // Only setup the file source if operators have configured a - // scaling policy directory to read from. - if a.config.Policy.Dir != "" { - sources[policy.SourceNameFile] = filePolicy.NewFileSource(a.logger, a.config.Policy.Dir, policyProcessor) - } - } - } - - // TODO: Once full policy source reload is implemented this should probably - // be just a warning. - if len(sources) == 0 { - return errors.New("no policy source available") - } - - a.policySources = sources - a.policyManager = policy.NewManager(a.logger, a.policySources, - a.pluginManager, a.config.Telemetry.CollectionInterval, limiter) - - return nil -} - func (a *Agent) initEnt(ctx context.Context, reload <-chan any) { go func() { for { From 8765b8fc2944b6c5b68f80a832a9a2a86688b8ad Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 15 Aug 2025 15:09:20 +0200 Subject: [PATCH 17/21] func: pass the historial apm getter from manager to handler --- policy/manager.go | 39 +++++++++++++++++++++------------------ policy/manager_test.go | 2 +- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/policy/manager.go b/policy/manager.go index bcae6704..f50b15f1 100644 --- a/policy/manager.go +++ b/policy/manager.go @@ -68,22 +68,24 @@ type Manager struct { historicalAPMGetter HistoricalAPMGetter } +type noopHistoricalAPMGetter struct{} + // NewManager returns a new Manager. func NewManager(log hclog.Logger, ps map[SourceName]Source, pm *manager.PluginManager, mInt time.Duration, l *Limiter) *Manager { return &Manager{ - log: log.ResetNamed("policy_manager"), - policySources: ps, - targetGetter: pm, - handlersLock: sync.RWMutex{}, - handlers: make(map[SourceName]map[PolicyID]*handlerTracker), - metricsInterval: mInt, - policyIDsCh: make(chan IDMessage, 2), - policyIDsErrCh: make(chan error, 2), - Limiter: l, - pluginManager: pm, - evaluateAfter: 0, - //historicalAPMGetter: &noopHistoricalAPMGetter{}, + log: log.ResetNamed("policy_manager"), + policySources: ps, + targetGetter: pm, + handlersLock: sync.RWMutex{}, + handlers: make(map[SourceName]map[PolicyID]*handlerTracker), + metricsInterval: mInt, + policyIDsCh: make(chan IDMessage, 2), + policyIDsErrCh: make(chan error, 2), + Limiter: l, + pluginManager: pm, + evaluateAfter: 0, + historicalAPMGetter: &noopHistoricalAPMGetter{}, } } @@ -280,12 +282,13 @@ func (m *Manager) processMessageAndUpdateHandlers(ctx context.Context, message I Log: m.log.Named("policy_handler").With("policy_id", policyID, "source", message.Source, "target", updatedPolicy.Target.Name, "target_config", updatedPolicy.Target.Config), - Policy: updatedPolicy, - UpdatesChan: upCh, - ErrChan: m.policyIDsErrCh, - TargetController: tg, - DependencyGetter: m.pluginManager, - Limiter: m.Limiter, + Policy: updatedPolicy, + UpdatesChan: upCh, + ErrChan: m.policyIDsErrCh, + TargetController: tg, + DependencyGetter: m.pluginManager, + Limiter: m.Limiter, + HistoricalAPMGetter: m.historicalAPMGetter, }) if err != nil { m.log.Error("encountered an error starting the policy handler", diff --git a/policy/manager_test.go b/policy/manager_test.go index 920abaf3..b925a3c1 100644 --- a/policy/manager_test.go +++ b/policy/manager_test.go @@ -148,7 +148,7 @@ var policy1 = &sdk.ScalingPolicy{ var policy2 = &sdk.ScalingPolicy{ ID: "policy2", Enabled: true, - Type: sdk.ScalingPolicyTypeVerticalCPU, + Type: sdk.ScalingPolicyTypeHorizontal, Checks: []*sdk.ScalingPolicyCheck{ { Name: "check1", From 158d587a8b60d4a237c206869cadfbf1ebaca49e Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 15 Aug 2025 15:42:58 +0200 Subject: [PATCH 18/21] func: add default historical apm getter to avoid panics --- policy/handler_oss.go | 2 ++ policy/manager.go | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/policy/handler_oss.go b/policy/handler_oss.go index b1b447fd..5bc29af4 100644 --- a/policy/handler_oss.go +++ b/policy/handler_oss.go @@ -14,6 +14,8 @@ import ( type HistoricalAPMGetter interface{} +type noopHistoricalAPMGetter struct{} + type noopVerticalCheckRunner struct { policy *sdk.ScalingPolicy } diff --git a/policy/manager.go b/policy/manager.go index f50b15f1..396b8be9 100644 --- a/policy/manager.go +++ b/policy/manager.go @@ -68,8 +68,6 @@ type Manager struct { historicalAPMGetter HistoricalAPMGetter } -type noopHistoricalAPMGetter struct{} - // NewManager returns a new Manager. func NewManager(log hclog.Logger, ps map[SourceName]Source, pm *manager.PluginManager, mInt time.Duration, l *Limiter) *Manager { From d2df3081464bcd5eee10d9991ca3a0cd456e2899 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 5 Sep 2025 16:01:40 +0200 Subject: [PATCH 19/21] func: set teh check for action direction into only the scaling needed test and account for vertical scaling where direction is none --- policy/handler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/policy/handler.go b/policy/handler.go index c617773f..e4b30e9d 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -332,7 +332,8 @@ func (h *Handler) Run(ctx context.Context) { } func scalingNeeded(a sdk.ScalingAction, countCount int64) bool { - return a.Direction != sdk.ScaleDirectionNone && countCount != a.Count + return (a.Direction == sdk.ScaleDirectionNone && countCount != a.Count) || + a.Direction != sdk.ScaleDirectionNone } func (h *Handler) waitAndScale(ctx context.Context) error { @@ -450,7 +451,7 @@ func (h *Handler) calculateNewCount(ctx context.Context, currentCount int64) (sd } winner := pickWinnerActionFromGroups(checkGroups) - if winner.handler == nil || winner.action == nil || winner.action.Direction == sdk.ScaleDirectionNone { + if winner.handler == nil || winner.action == nil { return sdk.ScalingAction{}, nil } From 5b0e9997076b13673930b756fff135af88f8a94b Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 5 Sep 2025 17:03:04 +0200 Subject: [PATCH 20/21] style: clean up comments --- policy/handler.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/policy/handler.go b/policy/handler.go index e4b30e9d..aa7a0241 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -332,6 +332,8 @@ func (h *Handler) Run(ctx context.Context) { } func scalingNeeded(a sdk.ScalingAction, countCount int64) bool { + // The DAS returns count but the direction is none, for vertical and horizontal + // policies checking the direction is enough. return (a.Direction == sdk.ScaleDirectionNone && countCount != a.Count) || a.Direction != sdk.ScaleDirectionNone } @@ -417,10 +419,9 @@ func (h *Handler) applyMutators(p *sdk.ScalingPolicy) { } } -// calculateHorizontalNewCount is the main part of the controller, it +// calculateNewCount is the main part of the controller, it // gets the metrics and the necessary new count to keep up with the policy -// and generates a scaling action if needed, but only for horizontal policies: -// horizontal app and horizontal cluster scaling policies. +// and generates a scaling action if needed. func (h *Handler) calculateNewCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { h.log.Debug("received policy for evaluation") From 3695f131fa7080a56cfd531bbff999101b6a9a9b Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Thu, 11 Sep 2025 15:44:17 +0200 Subject: [PATCH 21/21] func: set all the checks functions and types as private, remove the check for nil policy --- policy/check.go | 8 ++++---- policy/check_test.go | 4 ++-- policy/handler.go | 36 +++++++++++++++++++----------------- policy/handler_oss.go | 4 ++-- policy/handler_test.go | 1 - sdk/policy.go | 2 +- sdk/policy_test.go | 5 ----- 7 files changed, 28 insertions(+), 32 deletions(-) diff --git a/policy/check.go b/policy/check.go index c3b74ea1..1245efb1 100644 --- a/policy/check.go +++ b/policy/check.go @@ -41,7 +41,7 @@ type checkRunner struct { } // NewCheckHandler returns a new checkHandler instance. -func NewCheckRunner(config *CheckRunnerConfig, c *sdk.ScalingPolicyCheck) *checkRunner { +func newCheckRunner(config *CheckRunnerConfig, c *sdk.ScalingPolicyCheck) *checkRunner { return &checkRunner{ log: config.Log, check: c, @@ -131,7 +131,7 @@ func (ch *checkRunner) runStrategy(ctx context.Context, currentCount int64, ms s return *runResp.Action, nil } -// QueryMetrics wraps the apm.Query call to provide operational functionality. +// queryMetrics wraps the apm.Query call to provide operational functionality. func (ch *checkRunner) queryMetrics(ctx context.Context) (sdk.TimestampedMetrics, error) { ch.log.Debug("querying source", "query", ch.check.Query, "source", ch.check.Source) @@ -177,11 +177,11 @@ func (ch *checkRunner) queryMetrics(ctx context.Context) (sdk.TimestampedMetrics return ms, nil } -func (ch *checkRunner) Group() string { +func (ch *checkRunner) group() string { return ch.check.Group } -func (ch *checkRunner) RunCheckAndCapCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { +func (ch *checkRunner) runCheckAndCapCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { ch.log.Debug("received policy check for evaluation") metrics, err := ch.queryMetrics(ctx) diff --git a/policy/check_test.go b/policy/check_test.go index ec7324e6..3a3660b3 100644 --- a/policy/check_test.go +++ b/policy/check_test.go @@ -148,7 +148,7 @@ func TestCheckHandler_getNewCountFromMetrics(t *testing.T) { }, } - runner := NewCheckRunner(&CheckRunnerConfig{ + runner := newCheckRunner(&CheckRunnerConfig{ Log: hclog.NewNullLogger(), StrategyRunner: sr, Policy: tt.policy, @@ -226,7 +226,7 @@ func TestCheckHandler_runAPMQuery(t *testing.T) { }, } - handler := NewCheckRunner(&CheckRunnerConfig{ + handler := newCheckRunner(&CheckRunnerConfig{ Log: hclog.NewNullLogger(), MetricsGetter: mockLooker, Policy: &sdk.ScalingPolicy{ diff --git a/policy/handler.go b/policy/handler.go index aa7a0241..8a7e946e 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -61,8 +61,8 @@ type limiter interface { } type checker interface { - RunCheckAndCapCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) - Group() string + runCheckAndCapCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) + group() string } // Handler monitors a policy for changes and controls when them are sent for @@ -107,27 +107,26 @@ type Handler struct { } type HandlerConfig struct { - UpdatesChan chan *sdk.ScalingPolicy - ErrChan chan<- error - Policy *sdk.ScalingPolicy - Log hclog.Logger - TargetController targetpkg.Controller - Limiter *Limiter - DependencyGetter dependencyGetter + UpdatesChan chan *sdk.ScalingPolicy + ErrChan chan<- error + Policy *sdk.ScalingPolicy + Log hclog.Logger + TargetController targetpkg.Controller + Limiter *Limiter + DependencyGetter dependencyGetter + + // Ent only field HistoricalAPMGetter HistoricalAPMGetter EvaluateAfter time.Duration } func NewPolicyHandler(config HandlerConfig) (*Handler, error) { - h := &Handler{ log: config.Log, mutators: []Mutator{ NomadAPMMutator{}, }, pm: config.DependencyGetter, - historicalAPMGetter: config.HistoricalAPMGetter, - evaluateAfter: config.EvaluateAfter, targetController: config.TargetController, updatesCh: config.UpdatesChan, policy: config.Policy, @@ -135,11 +134,13 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { limiter: config.Limiter, stateLock: sync.RWMutex{}, state: StateIdle, + historicalAPMGetter: config.HistoricalAPMGetter, + evaluateAfter: config.EvaluateAfter, } err := h.loadCheckRunners() if err != nil { - return nil, fmt.Errorf("unable to load the checks for the handler: %w", err) + return nil, fmt.Errorf("failed to load check handlers: %w", err) } currentStatus, err := h.runTargetStatus() @@ -174,7 +175,8 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { return h, nil } -// Convert the last event string. +// checkForOutOfBandEvents tries to determine if there has been any recent +// scaling events in case of the autoscaler going offline. func checkForOutOfBandEvents(status *sdk.TargetStatus) (int64, error) { // If the target status includes a last event meta key, check for cooldown // due to out-of-band events. This is also useful if the Autoscaler has @@ -208,7 +210,7 @@ func (h *Handler) loadCheckRunners() error { return fmt.Errorf("failed to get APM for strategy %s: %w", check.Strategy.Name, err) } - runner := NewCheckRunner(&CheckRunnerConfig{ + runner := newCheckRunner(&CheckRunnerConfig{ Log: h.log.Named("check_handler").With("check", check.Name, "source", check.Source, "strategy", check.Strategy.Name), StrategyRunner: s, @@ -437,13 +439,13 @@ func (h *Handler) calculateNewCount(ctx context.Context, currentCount int64) (sd checkGroups := make(map[string][]checkResult) for _, ch := range h.checkRunners { - action, err := ch.RunCheckAndCapCount(ctx, currentCount) + action, err := ch.runCheckAndCapCount(ctx, currentCount) if err != nil { return sdk.ScalingAction{}, fmt.Errorf("failed get count from metrics: %v", err) } - g := ch.Group() + g := ch.group() checkGroups[g] = append(checkGroups[g], checkResult{ action: &action, handler: ch, diff --git a/policy/handler_oss.go b/policy/handler_oss.go index 5bc29af4..6cb29bce 100644 --- a/policy/handler_oss.go +++ b/policy/handler_oss.go @@ -20,7 +20,7 @@ type noopVerticalCheckRunner struct { policy *sdk.ScalingPolicy } -func (nv *noopVerticalCheckRunner) RunCheckAndCapCount(_ context.Context, currentCount int64) (sdk.ScalingAction, error) { +func (nv *noopVerticalCheckRunner) runCheckAndCapCount(_ context.Context, currentCount int64) (sdk.ScalingAction, error) { a := sdk.ScalingAction{ Direction: sdk.ScaleDirectionNone, Count: currentCount, @@ -31,7 +31,7 @@ func (nv *noopVerticalCheckRunner) RunCheckAndCapCount(_ context.Context, curren return a, nil } -func (nv *noopVerticalCheckRunner) Group() string { +func (nv *noopVerticalCheckRunner) group() string { return "" } diff --git a/policy/handler_test.go b/policy/handler_test.go index 722fc324..03d72aa4 100644 --- a/policy/handler_test.go +++ b/policy/handler_test.go @@ -290,7 +290,6 @@ func Test_pickWinnerActionFromGroups(t *testing.T) { must.Eq(t, tt.wantAction.Direction, result.action.Direction) must.Eq(t, tt.wantAction.Count, result.action.Count) must.NotNil(t, result.handler) - //must.Eq(t, tt.wantHandler.check.Name, result.handler.check.Name) } }) } diff --git a/sdk/policy.go b/sdk/policy.go index 9e0569b3..59c11200 100644 --- a/sdk/policy.go +++ b/sdk/policy.go @@ -87,7 +87,7 @@ type ScalingPolicy struct { // Validate applies validation rules that are independent of policy source. func (p *ScalingPolicy) Validate() error { if p == nil { - return errors.New("empty policy") + return nil } var result *multierror.Error diff --git a/sdk/policy_test.go b/sdk/policy_test.go index f8abfc85..cd8b2cbb 100644 --- a/sdk/policy_test.go +++ b/sdk/policy_test.go @@ -16,11 +16,6 @@ func TestScalingPolicy_Validate(t *testing.T) { policy *ScalingPolicy expectedError string }{ - { - name: "nil policy", - policy: nil, - expectedError: "empty policy", - }, { name: "no type", policy: &ScalingPolicy{