Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/26858.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
config: Added job_max_count option to limit number of allocs for a single job
```
9 changes: 9 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,15 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
conf.JobMaxPriority = jobMaxPriority
conf.JobDefaultPriority = jobDefaultPriority

jobMaxCount := structs.JobDefaultMaxCount
if agentConfig.Server.JobMaxCount != nil {
jobMaxCount = *agentConfig.Server.JobMaxCount
if jobMaxCount < 0 {
return nil, fmt.Errorf("job_max_count (%d) cannot be negative", jobMaxCount)
}
}
conf.JobMaxCount = jobMaxCount

if agentConfig.Server.JobTrackedVersions != nil {
if *agentConfig.Server.JobTrackedVersions <= 0 {
return nil, fmt.Errorf("job_tracked_versions must be greater than 0")
Expand Down
65 changes: 65 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,71 @@ func TestAgent_ServerConfig_JobDefaultPriority_Bad(t *testing.T) {
}
}

func TestAgent_ServerConfig_JobMaxCount(t *testing.T) {
ci.Parallel(t)

cases := []struct {
configured *int
expected int
expectedErr string
}{
{
configured: nil,
expected: structs.JobDefaultMaxCount,
expectedErr: "",
},
{
configured: pointer.Of(1),
expected: 1,
expectedErr: "",
},
{
configured: pointer.Of(0),
expected: 0,
expectedErr: "",
},
{
configured: pointer.Of(structs.JobDefaultMaxCount),
expected: structs.JobDefaultMaxCount,
expectedErr: "",
},
{
configured: pointer.Of(2 * structs.JobDefaultMaxCount),
expected: 2 * structs.JobDefaultMaxCount,
expectedErr: "",
},
{
configured: pointer.Of(-1),
expected: 0,
expectedErr: "job_max_count (-1) cannot be negative",
},
{
configured: pointer.Of(-3),
expected: 0,
expectedErr: "job_max_count (-3) cannot be negative",
},
}

for _, tc := range cases {
t.Run(fmt.Sprint(tc.configured), func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())

conf.Server.JobMaxCount = tc.configured

serverConf, err := convertServerConfig(conf)

if tc.expectedErr != "" {
must.Error(t, err)
must.ErrorContains(t, err, tc.expectedErr)
} else {
must.NoError(t, err)
must.Eq(t, tc.expected, serverConf.JobMaxCount)
}
})
}
}

func Test_convertServerConfig_clientIntroduction(t *testing.T) {
ci.Parallel(t)

Expand Down
7 changes: 7 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,9 @@ type ServerConfig struct {
// JobMaxPriority is an upper bound on the Job priority.
JobMaxPriority *int `hcl:"job_max_priority"`

// JobMaxCount is an upper bound on the number of instances in a Job.
JobMaxCount *int `hcl:"job_max_count"`

// JobMaxSourceSize limits the maximum size of a jobs source hcl/json
// before being discarded automatically. If unset, the maximum size defaults
// to 1 MB. If the value is zero, no job sources will be stored.
Expand Down Expand Up @@ -810,6 +813,7 @@ func (s *ServerConfig) Copy() *ServerConfig {
ns.RaftTrailingLogs = pointer.Copy(s.RaftTrailingLogs)
ns.JobDefaultPriority = pointer.Copy(s.JobDefaultPriority)
ns.JobMaxPriority = pointer.Copy(s.JobMaxPriority)
ns.JobMaxCount = pointer.Copy(s.JobMaxCount)
ns.JobTrackedVersions = pointer.Copy(s.JobTrackedVersions)
ns.ClientIntroduction = s.ClientIntroduction.Copy()
return &ns
Expand Down Expand Up @@ -2505,6 +2509,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.JobMaxPriority != nil {
result.JobMaxPriority = pointer.Of(*b.JobMaxPriority)
}
if b.JobMaxCount != nil {
result.JobMaxCount = pointer.Of(*b.JobMaxCount)
}
if b.EvalGCThreshold != "" {
result.EvalGCThreshold = b.EvalGCThreshold
}
Expand Down
1 change: 1 addition & 0 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ var basicConfig = &Config{
LicensePath: "/tmp/nomad.hclic",
JobDefaultPriority: pointer.Of(100),
JobMaxPriority: pointer.Of(200),
JobMaxCount: pointer.Of(1000),
StartTimeout: "1m",
ClientIntroduction: &ClientIntroduction{
Enforcement: "warn",
Expand Down
1 change: 1 addition & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func TestConfig_Merge(t *testing.T) {
},
JobMaxPriority: pointer.Of(200),
JobDefaultPriority: pointer.Of(100),
JobMaxCount: pointer.Of(1000),
OIDCIssuer: "https://oidc.test.nomadproject.io",
StartTimeout: "1m",
},
Expand Down
1 change: 1 addition & 0 deletions command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ server {
event_buffer_size = 200
job_default_priority = 100
job_max_priority = 200
job_max_count = 1000
start_timeout = "1m"

plan_rejection_tracker {
Expand Down
1 change: 1 addition & 0 deletions command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@
"license_path": "/tmp/nomad.hclic",
"job_default_priority": 100,
"job_max_priority": 200,
"job_max_count": 1000,
"start_timeout": "1m"
}
],
Expand Down
4 changes: 4 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ type Config struct {
// JobTrackedVersions is the number of historic Job versions that are kept.
JobTrackedVersions int

// JobMaxCount is the maximum total task group counts for a single Job.
JobMaxCount int

Reporting *config.ReportingConfig

// OIDCIssuer is the URL for the OIDC Issuer field in Workload Identity JWTs.
Expand Down Expand Up @@ -664,6 +667,7 @@ func DefaultConfig() *Config {
DeploymentQueryRateLimit: deploymentwatcher.LimitStateQueriesPerSecond,
JobDefaultPriority: structs.JobDefaultPriority,
JobMaxPriority: structs.JobDefaultMaxPriority,
JobMaxCount: structs.JobDefaultMaxCount,
JobTrackedVersions: structs.JobDefaultTrackedVersions,
StartTimeout: 30 * time.Second,
NodeIntroductionConfig: structs.DefaultNodeIntroductionConfig(),
Expand Down
13 changes: 12 additions & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,8 +1040,19 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
}
}

// Ensure that JobMaxCount is respected.
newCount := int(*args.Count)
totalCount := 0
for _, tg := range job.TaskGroups {
totalCount += tg.Count
}
totalCount = totalCount - group.Count + newCount
if j.srv.config.JobMaxCount > 0 && totalCount > j.srv.config.JobMaxCount {
return fmt.Errorf("total count was greater than configured job_max_count: %d > %d", totalCount, j.srv.config.JobMaxCount)
}

// Update group count
group.Count = int(*args.Count)
group.Count = newCount
job.SubmitTime = now

// Block scaling event if there's an active deployment
Expand Down
7 changes: 7 additions & 0 deletions nomad/job_endpoint_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,10 @@ func (v *jobValidate) Validate(job *structs.Job) (warnings []error, err error) {

okForIdentity := v.isEligibleForMultiIdentity()

totalCount := 0
for _, tg := range job.TaskGroups {
totalCount += tg.Count

for _, s := range tg.Services {
serviceErrs := v.validateServiceIdentity(
s, fmt.Sprintf("task group %s", tg.Name), okForIdentity)
Expand All @@ -543,6 +546,10 @@ func (v *jobValidate) Validate(job *structs.Job) (warnings []error, err error) {
warnings = append(warnings, vaultWarns...)
}
}
if v.srv.config.JobMaxCount > 0 && totalCount > v.srv.config.JobMaxCount {
err := fmt.Errorf("total count was greater than configured job_max_count: %d > %d", totalCount, v.srv.config.JobMaxCount)
multierror.Append(validationErrors, err)
}

return warnings, validationErrors.ErrorOrNil()
}
Expand Down
28 changes: 28 additions & 0 deletions nomad/job_endpoint_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,34 @@ import (
"github.com/shoenig/test/must"
)

func Test_jobValidate_Validate(t *testing.T) {
ci.Parallel(t)

t.Run("error if task group count exceeds job_max_count", func(t *testing.T) {
impl := jobValidate{srv: &Server{config: &Config{JobMaxCount: 10, JobMaxPriority: 100}}}
job := mock.Job()
job.TaskGroups[0].Count = 11
_, err := impl.Validate(job)
must.ErrorContains(t, err, "total count was greater than configured job_max_count: 11 > 10")
})

t.Run("no error if task group count equals job_max_count", func(t *testing.T) {
impl := jobValidate{srv: &Server{config: &Config{JobMaxCount: 10, JobMaxPriority: 100}}}
job := mock.Job()
job.TaskGroups[0].Count = 10
_, err := impl.Validate(job)
must.NoError(t, err)
})

t.Run("no error if job_max_count is zero (i.e. unlimited)", func(t *testing.T) {
impl := jobValidate{srv: &Server{config: &Config{JobMaxCount: 0, JobMaxPriority: 100}}}
job := mock.Job()
job.TaskGroups[0].Count = structs.JobDefaultMaxCount + 1
_, err := impl.Validate(job)
must.NoError(t, err)
})
}

func Test_jobValidate_Validate_consul_service(t *testing.T) {
ci.Parallel(t)

Expand Down
40 changes: 39 additions & 1 deletion nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7683,7 +7683,7 @@ func TestJobEndpoint_Scale_Invalid(t *testing.T) {
require.Contains(err.Error(), "should not contain count if error is true")
}

func TestJobEndpoint_Scale_OutOfBounds(t *testing.T) {
func TestJobEndpoint_Scale_TaskGroupOutOfBounds(t *testing.T) {
ci.Parallel(t)
require := require.New(t)

Expand Down Expand Up @@ -7726,6 +7726,44 @@ func TestJobEndpoint_Scale_OutOfBounds(t *testing.T) {
require.Contains(err.Error(), "group count was less than scaling policy minimum: 2 < 3")
}

func TestJobEndpoint_Scale_JobOutOfBounds(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(config *Config) {
config.JobMaxCount = 4
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()

const requestedCount = 6
job := mock.Job()
job.TaskGroups[0].Count = requestedCount

// register the job
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)
must.NoError(t, err)

var resp structs.JobRegisterResponse
scale := &structs.JobScaleRequest{
JobID: job.ID,
Target: map[string]string{
structs.ScalingTargetGroup: job.TaskGroups[0].Name,
},
Count: pointer.Of(int64(requestedCount)),
Message: "count too high",
PolicyOverride: false,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
must.Error(t, err)
must.ErrorContains(t, err, "total count was greater than configured job_max_count: 6 > 4")
}

func TestJobEndpoint_Scale_NoEval(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
3 changes: 3 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4327,6 +4327,9 @@ const (
// JobMaxPriority is the maximum allowed configuration value for maximum job priority
JobMaxPriority = math.MaxInt16 - 1

// JobDefaultMaxCount is the default maximum total task group counts per job
JobDefaultMaxCount = 50000

// CoreJobPriority should be higher than any user
// specified job so that it gets priority. This is important
// for the system to remain healthy.
Expand Down
8 changes: 8 additions & 0 deletions website/content/docs/configuration/server.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ server {
- `job_default_priority` `(int: 50)` - Specifies the default priority assigned to a job.
A valid value must be between `50` and `job_max_priority`.

- `job_max_count` `(int: 50000)` - Specifies the maximum number of allocations
for a job, as represented by the sum of its task group `count` fields. Jobs
of type `system` ignore this value. The child jobs of dispatched batch jobs
or periodic jobs are counted separately from their parent job. This value
must be non-negative. If set to 0, no limit is enforced. This value is enforced
at the time the job is submitted or scaled, and updating the value will not
impact existing jobs.

- `job_max_source_size` `(string: "1M")` - Specifies the size limit of the associated
job source content when registering a job. Note this is not a limit on the actual
size of a job. If the limit is exceeded, the original source is simply discarded
Expand Down
9 changes: 9 additions & 0 deletions website/content/docs/upgrade/upgrade-specific.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ being silently ignored. Any existing policies with duplicate or invalid keys
will continue to work, but the source policy document will need to be updated
to be valid before it can be written to Nomad.

#### Maximum number of allocations per job is limited by default

Nomad 1.11.0 limits the maximum number of allocations for a job to the value of
the new [`job_max_count`](/nomad/docs/configuration/server#job_max_count) server
configuration option, which defaults to 50000. The number of allocations is
determined from the sum of the job's task group `count` fields. This limit is
enforced at the time the job is submitted or scaled, and updating the value will
not impact existing jobs.

## Nomad 1.10.6

#### ACL policies no longer silently ignore duplicate or invalid keys
Expand Down