diff --git a/agent/agent.go b/agent/agent.go index 236ea114..84dc6a18 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -74,12 +74,12 @@ func (a *Agent) Run(ctx context.Context) error { defer close(policyEvalCh) limiter := policy.NewLimiter(policy.DefaultLimiterTimeout, - a.config.PolicyEval.Workers["horizontal"], - a.config.PolicyEval.Workers["cluster"]) + a.config.PolicyEval.Workers) if err := a.setupPolicyManager(limiter); err != nil { return fmt.Errorf("failed to setup policy manager: %v", err) } + go a.policyManager.Run(ctx, policyEvalCh) a.initEnt(ctx, a.entReload) @@ -89,48 +89,6 @@ func (a *Agent) Run(ctx context.Context) error { return nil } -func (a *Agent) setupPolicyManager(limiter *policy.Limiter) error { - - // Create our processor, a shared method for performing basic policy - // actions. - cfgDefaults := policy.ConfigDefaults{ - DefaultEvaluationInterval: a.config.Policy.DefaultEvaluationInterval, - DefaultCooldown: a.config.Policy.DefaultCooldown, - } - policyProcessor := policy.NewProcessor(&cfgDefaults, a.getNomadAPMNames()) - - // Setup our initial default policy source which is Nomad. - sources := map[policy.SourceName]policy.Source{} - for _, s := range a.config.Policy.Sources { - if s.Enabled == nil || !*s.Enabled { - continue - } - - switch policy.SourceName(s.Name) { - case policy.SourceNameNomad: - sources[policy.SourceNameNomad] = nomadPolicy.NewNomadSource(a.logger, a.NomadClient, policyProcessor) - case policy.SourceNameFile: - // Only setup the file source if operators have configured a - // scaling policy directory to read from. - if a.config.Policy.Dir != "" { - sources[policy.SourceNameFile] = filePolicy.NewFileSource(a.logger, a.config.Policy.Dir, policyProcessor) - } - } - } - - // TODO: Once full policy source reload is implemented this should probably - // be just a warning. - if len(sources) == 0 { - return errors.New("no policy source available") - } - - a.policySources = sources - a.policyManager = policy.NewManager(a.logger, a.policySources, - a.pluginManager, a.config.Telemetry.CollectionInterval, limiter) - - return nil -} - func (a *Agent) stop() { // Kill all the plugins. if a.pluginManager != nil { @@ -189,6 +147,48 @@ func (a *Agent) reload() { } } +func (a *Agent) setupPolicyManager(limiter *policy.Limiter) error { + + // Create our processor, a shared method for performing basic policy + // actions. + cfgDefaults := policy.ConfigDefaults{ + DefaultEvaluationInterval: a.config.Policy.DefaultEvaluationInterval, + DefaultCooldown: a.config.Policy.DefaultCooldown, + } + policyProcessor := policy.NewProcessor(&cfgDefaults, a.getNomadAPMNames()) + + // Setup our initial default policy source which is Nomad. + sources := map[policy.SourceName]policy.Source{} + for _, s := range a.config.Policy.Sources { + if s.Enabled == nil || !*s.Enabled { + continue + } + + switch policy.SourceName(s.Name) { + case policy.SourceNameNomad: + sources[policy.SourceNameNomad] = nomadPolicy.NewNomadSource(a.logger, a.NomadClient, policyProcessor) + case policy.SourceNameFile: + // Only setup the file source if operators have configured a + // scaling policy directory to read from. + if a.config.Policy.Dir != "" { + sources[policy.SourceNameFile] = filePolicy.NewFileSource(a.logger, a.config.Policy.Dir, policyProcessor) + } + } + } + + // TODO: Once full policy source reload is implemented this should probably + // be just a warning. + if len(sources) == 0 { + return errors.New("no policy source available") + } + + a.policySources = sources + a.policyManager = policy.NewManager(a.logger, a.policySources, + a.pluginManager, a.config.Telemetry.CollectionInterval, limiter) + + return nil +} + // handleSignals blocks until the agent receives an exit signal. func (a *Agent) handleSignals() { diff --git a/agent/agent_oss.go b/agent/agent_oss.go index c03dc3c3..1c280b6d 100644 --- a/agent/agent_oss.go +++ b/agent/agent_oss.go @@ -6,7 +6,9 @@ package agent -import "context" +import ( + "context" +) func (a *Agent) initEnt(ctx context.Context, reload <-chan any) { go func() { diff --git a/policy/check.go b/policy/check.go index 1ec10e06..1245efb1 100644 --- a/policy/check.go +++ b/policy/check.go @@ -41,7 +41,7 @@ type checkRunner struct { } // NewCheckHandler returns a new checkHandler instance. -func NewCheckRunner(config *CheckRunnerConfig, c *sdk.ScalingPolicyCheck) *checkRunner { +func newCheckRunner(config *CheckRunnerConfig, c *sdk.ScalingPolicyCheck) *checkRunner { return &checkRunner{ log: config.Log, check: c, @@ -51,10 +51,10 @@ func NewCheckRunner(config *CheckRunnerConfig, c *sdk.ScalingPolicyCheck) *check } } -// GetNewCountFromStrategy begins the execution of the checks and returns +// getNewCountFromStrategy begins the execution of the checks and returns // and action containing the instance count after applying the strategy to the // metrics. -func (ch *checkRunner) GetNewCountFromStrategy(ctx context.Context, currentCount int64, +func (ch *checkRunner) getNewCountFromStrategy(ctx context.Context, currentCount int64, metrics sdk.TimestampedMetrics) (sdk.ScalingAction, error) { ch.log.Debug("calculating new count", "current count", currentCount) @@ -131,8 +131,8 @@ func (ch *checkRunner) runStrategy(ctx context.Context, currentCount int64, ms s return *runResp.Action, nil } -// QueryMetrics wraps the apm.Query call to provide operational functionality. -func (ch *checkRunner) QueryMetrics(ctx context.Context) (sdk.TimestampedMetrics, error) { +// queryMetrics wraps the apm.Query call to provide operational functionality. +func (ch *checkRunner) queryMetrics(ctx context.Context) (sdk.TimestampedMetrics, error) { ch.log.Debug("querying source", "query", ch.check.Query, "source", ch.check.Source) // Trigger a metric measure to track latency of the call. @@ -176,3 +176,26 @@ func (ch *checkRunner) QueryMetrics(ctx context.Context) (sdk.TimestampedMetrics return ms, nil } + +func (ch *checkRunner) group() string { + return ch.check.Group +} + +func (ch *checkRunner) runCheckAndCapCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { + ch.log.Debug("received policy check for evaluation") + + metrics, err := ch.queryMetrics(ctx) + if err != nil { + return sdk.ScalingAction{}, fmt.Errorf("failed to query source: %v", err) + } + + action, err := ch.getNewCountFromStrategy(ctx, currentCount, metrics) + if err != nil { + return sdk.ScalingAction{}, fmt.Errorf("failed get count from metrics: %v", err) + + } + + action.CapCount(ch.policy.Min, ch.policy.Max) + + return action, nil +} diff --git a/policy/check_test.go b/policy/check_test.go index 78cfaf12..3a3660b3 100644 --- a/policy/check_test.go +++ b/policy/check_test.go @@ -148,13 +148,13 @@ func TestCheckHandler_getNewCountFromMetrics(t *testing.T) { }, } - runner := NewCheckRunner(&CheckRunnerConfig{ + runner := newCheckRunner(&CheckRunnerConfig{ Log: hclog.NewNullLogger(), StrategyRunner: sr, Policy: tt.policy, }, ch) - action, err := runner.GetNewCountFromStrategy(context.Background(), 3, tt.metrics) + action, err := runner.getNewCountFromStrategy(context.Background(), 3, tt.metrics) must.Eq(t, tt.expectedAction, action) must.Eq(t, tt.expError, errors.Unwrap(err)) @@ -226,7 +226,7 @@ func TestCheckHandler_runAPMQuery(t *testing.T) { }, } - handler := NewCheckRunner(&CheckRunnerConfig{ + handler := newCheckRunner(&CheckRunnerConfig{ Log: hclog.NewNullLogger(), MetricsGetter: mockLooker, Policy: &sdk.ScalingPolicy{ @@ -234,7 +234,7 @@ func TestCheckHandler_runAPMQuery(t *testing.T) { }, }, check) - result, err := handler.QueryMetrics(context.Background()) + result, err := handler.queryMetrics(context.Background()) must.Eq(t, tc.expResult, result) must.True(t, errors.Is(err, tc.expErr)) }) diff --git a/policy/handler.go b/policy/handler.go index af4c6ace..8a7e946e 100644 --- a/policy/handler.go +++ b/policy/handler.go @@ -60,6 +60,11 @@ type limiter interface { ReleaseSlot(p *sdk.ScalingPolicy) } +type checker interface { + runCheckAndCapCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) + group() string +} + // Handler monitors a policy for changes and controls when them are sent for // evaluation. type Handler struct { @@ -71,7 +76,7 @@ type Handler struct { policyLock sync.RWMutex policy *sdk.ScalingPolicy - checkRunners []*checkRunner + checkRunners []checker targetController targetpkg.Controller @@ -95,6 +100,10 @@ type Handler struct { outOfCooldownOn time.Time pm dependencyGetter + + // Ent only field + evaluateAfter time.Duration + historicalAPMGetter HistoricalAPMGetter } type HandlerConfig struct { @@ -105,23 +114,28 @@ type HandlerConfig struct { TargetController targetpkg.Controller Limiter *Limiter DependencyGetter dependencyGetter + + // Ent only field + HistoricalAPMGetter HistoricalAPMGetter + EvaluateAfter time.Duration } func NewPolicyHandler(config HandlerConfig) (*Handler, error) { - h := &Handler{ log: config.Log, mutators: []Mutator{ NomadAPMMutator{}, }, - pm: config.DependencyGetter, - targetController: config.TargetController, - updatesCh: config.UpdatesChan, - policy: config.Policy, - errChn: config.ErrChan, - limiter: config.Limiter, - stateLock: sync.RWMutex{}, - state: StateIdle, + pm: config.DependencyGetter, + targetController: config.TargetController, + updatesCh: config.UpdatesChan, + policy: config.Policy, + errChn: config.ErrChan, + limiter: config.Limiter, + stateLock: sync.RWMutex{}, + state: StateIdle, + historicalAPMGetter: config.HistoricalAPMGetter, + evaluateAfter: config.EvaluateAfter, } err := h.loadCheckRunners() @@ -161,30 +175,64 @@ func NewPolicyHandler(config HandlerConfig) (*Handler, error) { return h, nil } +// checkForOutOfBandEvents tries to determine if there has been any recent +// scaling events in case of the autoscaler going offline. +func checkForOutOfBandEvents(status *sdk.TargetStatus) (int64, error) { + // If the target status includes a last event meta key, check for cooldown + // due to out-of-band events. This is also useful if the Autoscaler has + // been re-deployed. + if status.Meta == nil { + return 0, nil + } + + ts, ok := status.Meta[sdk.TargetStatusMetaKeyLastEvent] + if !ok { + return 0, nil + } + + return strconv.ParseInt(ts, 10, 64) +} + func (h *Handler) loadCheckRunners() error { + runners := []checker{} + + switch h.policy.Type { + case sdk.ScalingPolicyTypeCluster, sdk.ScalingPolicyTypeHorizontal: + for _, check := range h.policy.Checks { + + s, err := h.pm.GetStrategyRunner(check.Strategy.Name) + if err != nil { + return fmt.Errorf("failed to get strategy %s: %w", check.Strategy.Name, err) + } + + mg, err := h.pm.GetAPMLooker(check.Source) + if err != nil { + return fmt.Errorf("failed to get APM for strategy %s: %w", check.Strategy.Name, err) + } + + runner := newCheckRunner(&CheckRunnerConfig{ + Log: h.log.Named("check_handler").With("check", check.Name, + "source", check.Source, "strategy", check.Strategy.Name), + StrategyRunner: s, + MetricsGetter: mg, + Policy: h.policy, + }, check) + + runners = append(runners, runner) - for _, check := range h.policy.Checks { - s, err := h.pm.GetStrategyRunner(check.Strategy.Name) - if err != nil { - return fmt.Errorf("failed to get strategy %s: %w", check.Strategy.Name, err) } - mg, err := h.pm.GetAPMLooker(check.Source) + case sdk.ScalingPolicyTypeVerticalCPU, sdk.ScalingPolicyTypeVerticalMem: + runner, err := h.loadVerticalCheckRunner() if err != nil { - return fmt.Errorf("failed to get APM for strategy %s: %w", check.Strategy.Name, err) + return fmt.Errorf("failed to load vertical check %s: %w", h.policy.Type, err) } - runner := NewCheckRunner(&CheckRunnerConfig{ - Log: h.log.Named("check_handler").With("check", check.Name, - "source", check.Source, "strategy", check.Strategy.Name), - StrategyRunner: s, - MetricsGetter: mg, - Policy: h.policy, - }, check) - - h.checkRunners = append(h.checkRunners, runner) + runners = append(runners, runner) } + // Do the update as a single operation to avoid partial updates. + h.checkRunners = runners return nil } @@ -242,10 +290,6 @@ func (h *Handler) Run(ctx context.Context) { continue } - // Canonicalize action so plugins don't have to. - action.Canonicalize() - action.CapCount(h.policy.Min, h.policy.Max) - h.log.Info("calculating scaling target", "policy_id", h.policy.ID, "from", currentCount, "to", action.Count, "reason", action.Reason, "meta", action.Meta) @@ -290,7 +334,10 @@ func (h *Handler) Run(ctx context.Context) { } func scalingNeeded(a sdk.ScalingAction, countCount int64) bool { - return a.Direction != sdk.ScaleDirectionNone && countCount != a.Count + // The DAS returns count but the direction is none, for vertical and horizontal + // policies checking the direction is enough. + return (a.Direction == sdk.ScaleDirectionNone && countCount != a.Count) || + a.Direction != sdk.ScaleDirectionNone } func (h *Handler) waitAndScale(ctx context.Context) error { @@ -330,23 +377,6 @@ func (h *Handler) waitAndScale(ctx context.Context) error { return nil } -// Convert the last event string. -func checkForOutOfBandEvents(status *sdk.TargetStatus) (int64, error) { - // If the target status includes a last event meta key, check for cooldown - // due to out-of-band events. This is also useful if the Autoscaler has - // been re-deployed. - if status.Meta == nil { - return 0, nil - } - - ts, ok := status.Meta[sdk.TargetStatusMetaKeyLastEvent] - if !ok { - return 0, nil - } - - return strconv.ParseInt(ts, 10, 64) -} - // updateHandler updates the handler's internal state based on the changes in // the policy being monitored. func (h *Handler) updateHandler(updatedPolicy *sdk.ScalingPolicy) { @@ -368,8 +398,6 @@ func (h *Handler) updateHandler(updatedPolicy *sdk.ScalingPolicy) { h.policyLock.Lock() defer h.policyLock.Unlock() - h.log.Debug("updating check handlers", "old_checks", len(h.policy.Checks), "new_checks", len(updatedPolicy.Checks)) - // Clear existing check handlers and load new ones. h.checkRunners = nil @@ -393,7 +421,7 @@ func (h *Handler) applyMutators(p *sdk.ScalingPolicy) { } } -// Handle policy is the main part of the controller, it reads the target state, +// calculateNewCount is the main part of the controller, it // gets the metrics and the necessary new count to keep up with the policy // and generates a scaling action if needed. func (h *Handler) calculateNewCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) { @@ -411,34 +439,27 @@ func (h *Handler) calculateNewCount(ctx context.Context, currentCount int64) (sd checkGroups := make(map[string][]checkResult) for _, ch := range h.checkRunners { - h.log.Debug("received policy check for evaluation") - - metrics, err := ch.QueryMetrics(ctx) - if err != nil { - return sdk.ScalingAction{}, fmt.Errorf("failed to query source: %v", err) - } - - action, err := ch.GetNewCountFromStrategy(ctx, currentCount, metrics) + action, err := ch.runCheckAndCapCount(ctx, currentCount) if err != nil { return sdk.ScalingAction{}, fmt.Errorf("failed get count from metrics: %v", err) } - group := ch.check.Group - checkGroups[group] = append(checkGroups[group], checkResult{ + g := ch.group() + checkGroups[g] = append(checkGroups[g], checkResult{ action: &action, handler: ch, - group: group, + group: g, }) } winner := pickWinnerActionFromGroups(checkGroups) - if winner.handler == nil || winner.action == nil || winner.action.Direction == sdk.ScaleDirectionNone { + if winner.handler == nil || winner.action == nil { return sdk.ScalingAction{}, nil } - h.log.Debug("check selected", "name", winner.handler.check.Name, - "direction", winner.action.Direction, "count", winner.action.Count) + h.log.Debug("check selected", "direction", winner.action.Direction, + "count", winner.action.Count) // At this point the checks have finished. Therefore emit of metric data // tracking how long it takes to run all the checks within a policy. @@ -599,7 +620,7 @@ func calculateRemainingCooldown(cd time.Duration, ts, lastEvent int64) time.Dura type checkResult struct { action *sdk.ScalingAction - handler *checkRunner + handler checker group string } diff --git a/policy/handler_oss.go b/policy/handler_oss.go new file mode 100644 index 00000000..6cb29bce --- /dev/null +++ b/policy/handler_oss.go @@ -0,0 +1,42 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +//go:build !ent +// +build !ent + +package policy + +import ( + "context" + + "github.com/hashicorp/nomad-autoscaler/sdk" +) + +type HistoricalAPMGetter interface{} + +type noopHistoricalAPMGetter struct{} + +type noopVerticalCheckRunner struct { + policy *sdk.ScalingPolicy +} + +func (nv *noopVerticalCheckRunner) runCheckAndCapCount(_ context.Context, currentCount int64) (sdk.ScalingAction, error) { + a := sdk.ScalingAction{ + Direction: sdk.ScaleDirectionNone, + Count: currentCount, + } + + a.CapCount(nv.policy.Min, nv.policy.Max) + + return a, nil +} + +func (nv *noopVerticalCheckRunner) group() string { + return "" +} + +func (h *Handler) loadVerticalCheckRunner() (*noopVerticalCheckRunner, error) { + return &noopVerticalCheckRunner{ + policy: h.policy, + }, nil +} diff --git a/policy/handler_test.go b/policy/handler_test.go index 54e54fd0..03d72aa4 100644 --- a/policy/handler_test.go +++ b/policy/handler_test.go @@ -290,7 +290,6 @@ func Test_pickWinnerActionFromGroups(t *testing.T) { must.Eq(t, tt.wantAction.Direction, result.action.Direction) must.Eq(t, tt.wantAction.Count, result.action.Count) must.NotNil(t, result.handler) - must.Eq(t, tt.wantHandler.check.Name, result.handler.check.Name) } }) } @@ -399,6 +398,7 @@ func TestHandler_Run_TargetNotReady_Integration(t *testing.T) { } var policy = &sdk.ScalingPolicy{ + Type: sdk.ScalingPolicyTypeHorizontal, ID: "test-policy", EvaluationInterval: 20 * time.Millisecond, Min: 1, @@ -704,5 +704,4 @@ func TestHandler_Run_StateChanges_Integration(t *testing.T) { }, handler.getNextAction()) }) } - } diff --git a/policy/manager.go b/policy/manager.go index fb5b1ca4..396b8be9 100644 --- a/policy/manager.go +++ b/policy/manager.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "maps" + "slices" "strings" "sync" "time" @@ -22,8 +23,9 @@ import ( const DefaultLimiterTimeout = 2 * time.Minute type handlerTracker struct { - cancel context.CancelFunc - updates chan<- *sdk.ScalingPolicy + policyType string + cancel context.CancelFunc + updates chan<- *sdk.ScalingPolicy } type targetGetter interface { @@ -60,22 +62,28 @@ type Manager struct { *Limiter pluginManager dependencyGetter + + // Ent only fields + evaluateAfter time.Duration + historicalAPMGetter HistoricalAPMGetter } // NewManager returns a new Manager. func NewManager(log hclog.Logger, ps map[SourceName]Source, pm *manager.PluginManager, mInt time.Duration, l *Limiter) *Manager { return &Manager{ - log: log.ResetNamed("policy_manager"), - policySources: ps, - targetGetter: pm, - handlersLock: sync.RWMutex{}, - handlers: make(map[SourceName]map[PolicyID]*handlerTracker), - metricsInterval: mInt, - policyIDsCh: make(chan IDMessage, 2), - policyIDsErrCh: make(chan error, 2), - Limiter: l, - pluginManager: pm, + log: log.ResetNamed("policy_manager"), + policySources: ps, + targetGetter: pm, + handlersLock: sync.RWMutex{}, + handlers: make(map[SourceName]map[PolicyID]*handlerTracker), + metricsInterval: mInt, + policyIDsCh: make(chan IDMessage, 2), + policyIDsErrCh: make(chan error, 2), + Limiter: l, + pluginManager: pm, + evaluateAfter: 0, + historicalAPMGetter: &noopHistoricalAPMGetter{}, } } @@ -250,8 +258,9 @@ func (m *Manager) processMessageAndUpdateHandlers(ctx context.Context, message I upCh := make(chan *sdk.ScalingPolicy, 1) nht := &handlerTracker{ - updates: upCh, - cancel: handlerCancel, + updates: upCh, + cancel: handlerCancel, + policyType: updatedPolicy.Type, } m.log.Debug("creating new policy handler", @@ -271,12 +280,13 @@ func (m *Manager) processMessageAndUpdateHandlers(ctx context.Context, message I Log: m.log.Named("policy_handler").With("policy_id", policyID, "source", message.Source, "target", updatedPolicy.Target.Name, "target_config", updatedPolicy.Target.Config), - Policy: updatedPolicy, - UpdatesChan: upCh, - ErrChan: m.policyIDsErrCh, - TargetController: tg, - DependencyGetter: m.pluginManager, - Limiter: m.Limiter, + Policy: updatedPolicy, + UpdatesChan: upCh, + ErrChan: m.policyIDsErrCh, + TargetController: tg, + DependencyGetter: m.pluginManager, + Limiter: m.Limiter, + HistoricalAPMGetter: m.historicalAPMGetter, }) if err != nil { m.log.Error("encountered an error starting the policy handler", @@ -304,21 +314,13 @@ func (m *Manager) stopHandlers() { for source, handlers := range m.handlers { for id := range handlers { m.stopHandler(source, id) + delete(handlers, id) } + delete(m.handlers, source) } } -func (m *Manager) addHandlerTracker(source SourceName, id PolicyID, nht *handlerTracker) { - m.handlersLock.Lock() - if m.handlers[source] == nil { - m.handlers[source] = make(map[PolicyID]*handlerTracker) - } - - m.handlers[source][id] = nht - m.handlersLock.Unlock() -} - // stopHandler stops a handler and removes it from the manager's internal // state storage. // @@ -332,6 +334,30 @@ func (m *Manager) stopHandler(source SourceName, id PolicyID) { close(ht.updates) } +func (m *Manager) StopHandlersByType(typesToStop ...string) { + m.handlersLock.Lock() + defer m.handlersLock.Unlock() + + for source, handlers := range m.handlers { + for id, tracker := range handlers { + if slices.Contains(typesToStop, tracker.policyType) { + m.stopHandler(source, id) + delete(handlers, id) + } + } + } +} + +func (m *Manager) addHandlerTracker(source SourceName, id PolicyID, nht *handlerTracker) { + m.handlersLock.Lock() + if m.handlers[source] == nil { + m.handlers[source] = make(map[PolicyID]*handlerTracker) + } + + m.handlers[source][id] = nht + m.handlersLock.Unlock() +} + // ReloadSources triggers a reload of all the policy sources. func (m *Manager) ReloadSources() { @@ -341,6 +367,14 @@ func (m *Manager) ReloadSources() { } } +func (m *Manager) SetEvaluateAfter(ea time.Duration) { + m.evaluateAfter = ea +} + +func (m *Manager) SetHistoricalAPMGetter(hag HistoricalAPMGetter) { + m.historicalAPMGetter = hag +} + // periodicMetricsReporter periodically emits metrics for the policy manager // which cannot be performed during inline function calls. func (m *Manager) periodicMetricsReporter(ctx context.Context, interval time.Duration) { @@ -388,16 +422,20 @@ type Limiter struct { var ErrExecutionTimeout = errors.New("timeout while waiting for slot") -func NewLimiter(timeout time.Duration, hWorkersCount, cWorkersCount int) *Limiter { +func NewLimiter(timeout time.Duration, workersConfig map[string]int) *Limiter { return &Limiter{ timeout: timeout, slots: map[string]chan struct{}{ - "horizontal": make(chan struct{}, hWorkersCount), - "cluster": make(chan struct{}, cWorkersCount), + sdk.ScalingPolicyTypeHorizontal: make(chan struct{}, workersConfig["horizontal"]), + sdk.ScalingPolicyTypeCluster: make(chan struct{}, workersConfig["cluster"]), }, } } +func (l *Limiter) AddQueue(queue string, size int) { + l.slots[queue] = make(chan struct{}, size) +} + func (l *Limiter) QueueSize(queue string) int { return len(l.slots[queue]) } diff --git a/policy/manager_test.go b/policy/manager_test.go index 9b2dd6ff..b925a3c1 100644 --- a/policy/manager_test.go +++ b/policy/manager_test.go @@ -121,6 +121,7 @@ func (ms *mockSource) ReloadIDsMonitor() var policy1 = &sdk.ScalingPolicy{ ID: "policy1", + Type: sdk.ScalingPolicyTypeCluster, Enabled: true, Checks: []*sdk.ScalingPolicyCheck{ { @@ -147,6 +148,7 @@ var policy1 = &sdk.ScalingPolicy{ var policy2 = &sdk.ScalingPolicy{ ID: "policy2", Enabled: true, + Type: sdk.ScalingPolicyTypeHorizontal, Checks: []*sdk.ScalingPolicyCheck{ { Name: "check1", @@ -164,6 +166,7 @@ var policy2 = &sdk.ScalingPolicy{ var policy3 = &sdk.ScalingPolicy{ ID: "policy3", + Type: sdk.ScalingPolicyTypeHorizontal, Enabled: true, Checks: []*sdk.ScalingPolicyCheck{ { @@ -552,7 +555,8 @@ func TestProcessMessageAndUpdateHandlers_GetTargetReporterError(t *testing.T) { name: "mock-source", latestVersion: map[PolicyID]*sdk.ScalingPolicy{ "policy1": { - ID: "policy1", + ID: "policy1", + Type: sdk.ScalingPolicyTypeCluster, Checks: []*sdk.ScalingPolicyCheck{ { Name: "check1", @@ -563,7 +567,8 @@ func TestProcessMessageAndUpdateHandlers_GetTargetReporterError(t *testing.T) { }, }, "policy2": { - ID: "policy2", + ID: "policy2", + Type: sdk.ScalingPolicyTypeHorizontal, Checks: []*sdk.ScalingPolicyCheck{ { Name: "check1", @@ -574,7 +579,8 @@ func TestProcessMessageAndUpdateHandlers_GetTargetReporterError(t *testing.T) { }, }, "policy3": { - ID: "policy3", + ID: "policy3", + Type: sdk.ScalingPolicyTypeVerticalCPU, Checks: []*sdk.ScalingPolicyCheck{ { Name: "check1", diff --git a/policy/nomad/source.go b/policy/nomad/source.go index ddb24494..82ed5c67 100644 --- a/policy/nomad/source.go +++ b/policy/nomad/source.go @@ -254,9 +254,9 @@ func (s *Source) canonicalizePolicy(p *sdk.ScalingPolicy) { func (s *Source) canonicalizePolicyByType(p *sdk.ScalingPolicy) { switch p.Type { - case "horizontal": + case sdk.ScalingPolicyTypeHorizontal: s.canonicalizeHorizontalPolicy(p) - case "cluster": + case sdk.ScalingPolicyTypeCluster: // Nothing to do for now. default: s.canonicalizeAdditionalTypes(p) diff --git a/policy/nomad/validate.go b/policy/nomad/validate.go index ba4cb120..22be0ff7 100644 --- a/policy/nomad/validate.go +++ b/policy/nomad/validate.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad-autoscaler/plugins" + "github.com/hashicorp/nomad-autoscaler/sdk" "github.com/hashicorp/nomad-autoscaler/sdk/helper/ptr" "github.com/hashicorp/nomad/api" ) @@ -54,9 +55,9 @@ func validateScalingPolicy(policy *api.ScalingPolicy) error { func validateScalingPolicyByType(policy *api.ScalingPolicy) error { switch policy.Type { - case "horizontal", "": + case sdk.ScalingPolicyTypeHorizontal, "": return validateHorizontalPolicy(policy) - case "cluster": + case sdk.ScalingPolicyTypeCluster: return validateClusterPolicy(policy) default: return additionalPolicyTypeValidation(policy) diff --git a/sdk/policy.go b/sdk/policy.go index 43dabc79..59c11200 100644 --- a/sdk/policy.go +++ b/sdk/policy.go @@ -4,6 +4,7 @@ package sdk import ( + "errors" "fmt" "strings" "time" @@ -13,8 +14,10 @@ import ( ) const ( - ScalingPolicyTypeCluster = "cluster" - ScalingPolicyTypeHorizontal = "horizontal" + ScalingPolicyTypeCluster = "cluster" + ScalingPolicyTypeHorizontal = "horizontal" + ScalingPolicyTypeVerticalCPU = "vertical_cpu" + ScalingPolicyTypeVerticalMem = "vertical_mem" ScalingPolicyOnErrorFail = "fail" ScalingPolicyOnErrorIgnore = "ignore" @@ -89,6 +92,10 @@ func (p *ScalingPolicy) Validate() error { var result *multierror.Error + if p.Type == "" { + result = multierror.Append(result, errors.New("policy has not type defined")) + } + switch p.OnCheckError { case "", ScalingPolicyOnErrorFail, ScalingPolicyOnErrorIgnore: default: diff --git a/sdk/policy_test.go b/sdk/policy_test.go index 62a6f540..cd8b2cbb 100644 --- a/sdk/policy_test.go +++ b/sdk/policy_test.go @@ -17,9 +17,11 @@ func TestScalingPolicy_Validate(t *testing.T) { expectedError string }{ { - name: "nil policy", - policy: nil, - expectedError: "", + name: "no type", + policy: &ScalingPolicy{ + Type: "", + }, + expectedError: "policy has not type defined", }, { name: "invalid on_check_error",