Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1a3aa70
style: pass all the workers configuration to the NewLimiter func
Juanadelacuesta Aug 12, 2025
12f2033
func: add oss support for vertical policues
Juanadelacuesta Aug 12, 2025
c0ebf91
fix: udate all values of the handler on policy update
Juanadelacuesta Aug 12, 2025
d01f41e
func: update integration handler tests
Juanadelacuesta Aug 12, 2025
e96c2a6
func: add the possibilty to stop stop handlers by type
Juanadelacuesta Aug 12, 2025
96cdf2e
fix: update integration tests to load the correct count function
Juanadelacuesta Aug 13, 2025
519a63b
func: separate the manager configuration
Juanadelacuesta Aug 13, 2025
1d24c90
func: add handler_oss
Juanadelacuesta Aug 13, 2025
bbcd6e5
func: create the ent policy types and use the const instead of teh st…
Juanadelacuesta Aug 13, 2025
271f7cf
func: update the agent to pick up different policy manager configs
Juanadelacuesta Aug 13, 2025
86b2cd3
func: settle on one checker, not by type
Juanadelacuesta Aug 14, 2025
4875e70
func: create the vertical checker for both ent and oss
Juanadelacuesta Aug 14, 2025
3cc8e33
fix: add policy type to test and add validation for type plus fix val…
Juanadelacuesta Aug 15, 2025
3adf812
func: add policy type to tests policies
Juanadelacuesta Aug 15, 2025
28d22f7
fix: check and report error on loading the check runners
Juanadelacuesta Aug 15, 2025
2633787
func: put the policy manager init back to common code
Juanadelacuesta Aug 15, 2025
8765b8f
func: pass the historial apm getter from manager to handler
Juanadelacuesta Aug 15, 2025
158d587
func: add default historical apm getter to avoid panics
Juanadelacuesta Aug 15, 2025
d2df308
func: set teh check for action direction into only the scaling needed…
Juanadelacuesta Sep 5, 2025
5b0e999
style: clean up comments
Juanadelacuesta Sep 5, 2025
3695f13
func: set all the checks functions and types as private, remove the c…
Juanadelacuesta Sep 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 44 additions & 44 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ func (a *Agent) Run(ctx context.Context) error {
defer close(policyEvalCh)

limiter := policy.NewLimiter(policy.DefaultLimiterTimeout,
a.config.PolicyEval.Workers["horizontal"],
a.config.PolicyEval.Workers["cluster"])
a.config.PolicyEval.Workers)

if err := a.setupPolicyManager(limiter); err != nil {
return fmt.Errorf("failed to setup policy manager: %v", err)
}

go a.policyManager.Run(ctx, policyEvalCh)

a.initEnt(ctx, a.entReload)
Expand All @@ -89,48 +89,6 @@ func (a *Agent) Run(ctx context.Context) error {
return nil
}

func (a *Agent) setupPolicyManager(limiter *policy.Limiter) error {

// Create our processor, a shared method for performing basic policy
// actions.
cfgDefaults := policy.ConfigDefaults{
DefaultEvaluationInterval: a.config.Policy.DefaultEvaluationInterval,
DefaultCooldown: a.config.Policy.DefaultCooldown,
}
policyProcessor := policy.NewProcessor(&cfgDefaults, a.getNomadAPMNames())

// Setup our initial default policy source which is Nomad.
sources := map[policy.SourceName]policy.Source{}
for _, s := range a.config.Policy.Sources {
if s.Enabled == nil || !*s.Enabled {
continue
}

switch policy.SourceName(s.Name) {
case policy.SourceNameNomad:
sources[policy.SourceNameNomad] = nomadPolicy.NewNomadSource(a.logger, a.NomadClient, policyProcessor)
case policy.SourceNameFile:
// Only setup the file source if operators have configured a
// scaling policy directory to read from.
if a.config.Policy.Dir != "" {
sources[policy.SourceNameFile] = filePolicy.NewFileSource(a.logger, a.config.Policy.Dir, policyProcessor)
}
}
}

// TODO: Once full policy source reload is implemented this should probably
// be just a warning.
if len(sources) == 0 {
return errors.New("no policy source available")
}

a.policySources = sources
a.policyManager = policy.NewManager(a.logger, a.policySources,
a.pluginManager, a.config.Telemetry.CollectionInterval, limiter)

return nil
}

func (a *Agent) stop() {
// Kill all the plugins.
if a.pluginManager != nil {
Expand Down Expand Up @@ -189,6 +147,48 @@ func (a *Agent) reload() {
}
}

func (a *Agent) setupPolicyManager(limiter *policy.Limiter) error {

// Create our processor, a shared method for performing basic policy
// actions.
cfgDefaults := policy.ConfigDefaults{
DefaultEvaluationInterval: a.config.Policy.DefaultEvaluationInterval,
DefaultCooldown: a.config.Policy.DefaultCooldown,
}
policyProcessor := policy.NewProcessor(&cfgDefaults, a.getNomadAPMNames())

// Setup our initial default policy source which is Nomad.
sources := map[policy.SourceName]policy.Source{}
for _, s := range a.config.Policy.Sources {
if s.Enabled == nil || !*s.Enabled {
continue
}

switch policy.SourceName(s.Name) {
case policy.SourceNameNomad:
sources[policy.SourceNameNomad] = nomadPolicy.NewNomadSource(a.logger, a.NomadClient, policyProcessor)
case policy.SourceNameFile:
// Only setup the file source if operators have configured a
// scaling policy directory to read from.
if a.config.Policy.Dir != "" {
sources[policy.SourceNameFile] = filePolicy.NewFileSource(a.logger, a.config.Policy.Dir, policyProcessor)
}
}
}

// TODO: Once full policy source reload is implemented this should probably
// be just a warning.
if len(sources) == 0 {
return errors.New("no policy source available")
}

a.policySources = sources
a.policyManager = policy.NewManager(a.logger, a.policySources,
a.pluginManager, a.config.Telemetry.CollectionInterval, limiter)

return nil
}

// handleSignals blocks until the agent receives an exit signal.
func (a *Agent) handleSignals() {

Expand Down
4 changes: 3 additions & 1 deletion agent/agent_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

package agent

import "context"
import (
"context"
)

func (a *Agent) initEnt(ctx context.Context, reload <-chan any) {
go func() {
Expand Down
33 changes: 28 additions & 5 deletions policy/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type checkRunner struct {
}

// NewCheckHandler returns a new checkHandler instance.
func NewCheckRunner(config *CheckRunnerConfig, c *sdk.ScalingPolicyCheck) *checkRunner {
func newCheckRunner(config *CheckRunnerConfig, c *sdk.ScalingPolicyCheck) *checkRunner {
return &checkRunner{
log: config.Log,
check: c,
Expand All @@ -51,10 +51,10 @@ func NewCheckRunner(config *CheckRunnerConfig, c *sdk.ScalingPolicyCheck) *check
}
}

// GetNewCountFromStrategy begins the execution of the checks and returns
// getNewCountFromStrategy begins the execution of the checks and returns
// and action containing the instance count after applying the strategy to the
// metrics.
func (ch *checkRunner) GetNewCountFromStrategy(ctx context.Context, currentCount int64,
func (ch *checkRunner) getNewCountFromStrategy(ctx context.Context, currentCount int64,
metrics sdk.TimestampedMetrics) (sdk.ScalingAction, error) {
ch.log.Debug("calculating new count", "current count", currentCount)

Expand Down Expand Up @@ -131,8 +131,8 @@ func (ch *checkRunner) runStrategy(ctx context.Context, currentCount int64, ms s
return *runResp.Action, nil
}

// QueryMetrics wraps the apm.Query call to provide operational functionality.
func (ch *checkRunner) QueryMetrics(ctx context.Context) (sdk.TimestampedMetrics, error) {
// queryMetrics wraps the apm.Query call to provide operational functionality.
func (ch *checkRunner) queryMetrics(ctx context.Context) (sdk.TimestampedMetrics, error) {
ch.log.Debug("querying source", "query", ch.check.Query, "source", ch.check.Source)

// Trigger a metric measure to track latency of the call.
Expand Down Expand Up @@ -176,3 +176,26 @@ func (ch *checkRunner) QueryMetrics(ctx context.Context) (sdk.TimestampedMetrics

return ms, nil
}

func (ch *checkRunner) group() string {
return ch.check.Group
}

func (ch *checkRunner) runCheckAndCapCount(ctx context.Context, currentCount int64) (sdk.ScalingAction, error) {
ch.log.Debug("received policy check for evaluation")

metrics, err := ch.queryMetrics(ctx)
if err != nil {
return sdk.ScalingAction{}, fmt.Errorf("failed to query source: %v", err)
}

action, err := ch.getNewCountFromStrategy(ctx, currentCount, metrics)
if err != nil {
return sdk.ScalingAction{}, fmt.Errorf("failed get count from metrics: %v", err)

}

action.CapCount(ch.policy.Min, ch.policy.Max)

return action, nil
}
8 changes: 4 additions & 4 deletions policy/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ func TestCheckHandler_getNewCountFromMetrics(t *testing.T) {
},
}

runner := NewCheckRunner(&CheckRunnerConfig{
runner := newCheckRunner(&CheckRunnerConfig{
Log: hclog.NewNullLogger(),
StrategyRunner: sr,
Policy: tt.policy,
}, ch)

action, err := runner.GetNewCountFromStrategy(context.Background(), 3, tt.metrics)
action, err := runner.getNewCountFromStrategy(context.Background(), 3, tt.metrics)
must.Eq(t, tt.expectedAction, action)
must.Eq(t, tt.expError, errors.Unwrap(err))

Expand Down Expand Up @@ -226,15 +226,15 @@ func TestCheckHandler_runAPMQuery(t *testing.T) {
},
}

handler := NewCheckRunner(&CheckRunnerConfig{
handler := newCheckRunner(&CheckRunnerConfig{
Log: hclog.NewNullLogger(),
MetricsGetter: mockLooker,
Policy: &sdk.ScalingPolicy{
ID: "testPolicy",
},
}, check)

result, err := handler.QueryMetrics(context.Background())
result, err := handler.queryMetrics(context.Background())
must.Eq(t, tc.expResult, result)
must.True(t, errors.Is(err, tc.expErr))
})
Expand Down
Loading
Loading