Skip to content

Commit

Permalink
Add CPU and RAM limits for Docker containers (#54)
Browse files Browse the repository at this point in the history
* Add CPU and RAM limits for Docker containers

* Add cpu and memory limit to the API
  • Loading branch information
robertjndw authored Feb 7, 2024
1 parent c913abc commit eea0537
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 26 deletions.
11 changes: 11 additions & 0 deletions HadesAPI/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ func AddBuildToQueue(c *gin.Context) {
return
}

// Check whether the request is valid
for _, step := range payload.QueuePayload.Steps {
if step.MemoryLimit != "" {
if _, err := utils.ParseMemoryLimit(step.MemoryLimit); err != nil {
log.WithError(err).Error("Failed to parse RAM limit")
c.String(http.StatusBadRequest, "Failed to parse RAM limit")
return
}
}
}

log.Debug("Received build request ", payload)
json_payload, err := json.Marshal(payload.QueuePayload)
if err != nil {
Expand Down
62 changes: 43 additions & 19 deletions HadesScheduler/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,65 @@ import (
log "github.com/sirupsen/logrus"
)

var cli *client.Client
var global_envs []string = []string{}
var container_autoremove bool
var DockerCfg DockerConfig

type Scheduler struct{}
type Scheduler struct {
cli *client.Client
script_executor string
container_autoremove bool
cpu_limit uint
memory_limit string
}

type DockerConfig struct {
DockerHost string `env:"DOCKER_HOST" envDefault:"unix:///var/run/docker.sock"`
ContainerAutoremove bool `env:"CONTAINER_AUTOREMOVE" envDefault:"true"`
DockerScriptExecutor string `env:"DOCKER_SCRIPT_EXECUTOR" envDefault:"/bin/bash -c"`
CPU_limit uint `env:"DOCKER_CPU_LIMIT"` // Number of CPUs - e.g. '6'
MEMORY_limit string `env:"DOCKER_MEMORY_LIMIT"` // RAM usage in g or m - e.g. '4g'
}

func init() {
utils.LoadConfig(&DockerCfg)
container_autoremove = DockerCfg.ContainerAutoremove
func NewDockerScheduler() Scheduler {
var dockerCfg DockerConfig
utils.LoadConfig(&dockerCfg)
log.Debugf("Docker config: %+v", dockerCfg)

var err error
// Create a new Docker client
cli, err = client.NewClientWithOpts(client.WithHost(DockerCfg.DockerHost), client.WithAPIVersionNegotiation())
cli, err := client.NewClientWithOpts(client.WithHost(dockerCfg.DockerHost), client.WithAPIVersionNegotiation())
if err != nil {
log.WithError(err).Fatal("Failed to create Docker client")
}
return Scheduler{
cli: cli,
container_autoremove: dockerCfg.ContainerAutoremove,
script_executor: dockerCfg.DockerScriptExecutor,
cpu_limit: dockerCfg.CPU_limit,
memory_limit: dockerCfg.MEMORY_limit,
}
}

func (d Scheduler) ScheduleJob(ctx context.Context, job payload.QueuePayload) error {
// Create a unique volume name for this job
volumeName := fmt.Sprintf("shared-%d", time.Now().UnixNano())
// Create the shared volume
if err := createSharedVolume(ctx, cli, volumeName); err != nil {
if err := createSharedVolume(ctx, d.cli, volumeName); err != nil {
log.WithError(err).Error("Failed to create shared volume")
return err
}

var global_envs []string
// Read the global env variables from the job metadata
for k, v := range job.Metadata {
global_envs = append(global_envs, fmt.Sprintf("%s=%s", k, v))
}

for _, step := range job.Steps {
executeStep(ctx, cli, step, volumeName)
d.executeStep(ctx, d.cli, step, volumeName, global_envs)
}

// Delete the shared volume after the job is done
defer func() {
time.Sleep(1 * time.Second)
if err := deleteSharedVolume(ctx, cli, volumeName); err != nil {
if err := deleteSharedVolume(ctx, d.cli, volumeName); err != nil {
log.WithError(err).Error("Failed to delete shared volume")
}

Expand All @@ -72,17 +84,17 @@ func (d Scheduler) ScheduleJob(ctx context.Context, job payload.QueuePayload) er
return nil
}

func executeStep(ctx context.Context, client *client.Client, step payload.Step, volumeName string) error {
func (d Scheduler) executeStep(ctx context.Context, client *client.Client, step payload.Step, volumeName string, envs []string) error {
// Pull the images
err := pullImages(ctx, cli, step.Image)
err := pullImages(ctx, d.cli, step.Image)
if err != nil {
log.WithError(err).Errorf("Failed to pull image %s", step.Image)
return err
}

// Copy the global envs and add the step specific ones
var envs []string
copy(envs, global_envs)
var step_envs []string
copy(step_envs, envs)
for k, v := range step.Metadata {
envs = append(envs, fmt.Sprintf("%s=%s", k, v))
}
Expand All @@ -101,13 +113,25 @@ func executeStep(ctx context.Context, client *client.Client, step payload.Step,
Target: "/shared",
},
},
AutoRemove: container_autoremove, // Remove the container after it is done only if the config is set to true
AutoRemove: d.container_autoremove, // Remove the container after it is done only if the config is set to true
}

// Limit the resource usage of the containers
cpu_limit := utils.FindLimit(int(step.CPULimit), int(d.cpu_limit))
if cpu_limit != 0 {
log.Debug("Setting CPU limit to ", cpu_limit)
host_config.Resources.NanoCPUs = int64(float64(cpu_limit) * 1e9)
}
ram_limit := utils.FindMemoryLimit(step.MemoryLimit, d.memory_limit)
if ram_limit != 0 {
log.Debug("Setting RAM limit to ", ram_limit)
host_config.Resources.Memory = int64(ram_limit)
}

// Create the bash script if there is one
if step.Script != "" {
// Overwrite the default entrypoint
container_config.Entrypoint = strings.Split(DockerCfg.DockerScriptExecutor, " ")
container_config.Entrypoint = strings.Split(d.script_executor, " ")
container_config.Entrypoint = append(container_config.Entrypoint, step.Script)
}

Expand Down
4 changes: 2 additions & 2 deletions HadesScheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func main() {
// scheduler = kube.Scheduler{}
case "docker":
log.Info("Started HadesScheduler in Docker mode")
scheduler = docker.Scheduler{}
scheduler = docker.NewDockerScheduler()
default:
log.Fatalf("Invalid executor specified: %s", executorCfg.Executor)
}

AsynqServer.Run(asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
log.Debug("Received task: ", t)
log.Debug("Received task: ", t.Type())
var job payload.QueuePayload
if err := json.Unmarshal(t.Payload(), &job); err != nil {
log.WithError(err).Error("Failed to unmarshal task payload")
Expand Down
12 changes: 7 additions & 5 deletions shared/payload/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ type QueuePayload struct {
}

type Step struct {
ID int `json:"id"`
Name string `json:"name"`
Image string `json:"image"`
Script string `json:"script"`
Metadata map[string]string `json:"metadata"`
ID int `json:"id"`
Name string `json:"name"`
Image string `json:"image"`
Script string `json:"script"`
Metadata map[string]string `json:"metadata"`
CPULimit uint `json:"cpu_limit"`
MemoryLimit string `json:"memory_limit"`
}
22 changes: 22 additions & 0 deletions shared/utils/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package utils

import (
"fmt"
"strconv"
"strings"

"github.com/caarlos0/env/v9"
"github.com/joho/godotenv"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -32,3 +36,21 @@ func LoadConfig(cfg interface{}) {

log.Debug("Config loaded: ", cfg)
}

func ParseMemoryLimit(limit string) (int64, error) {
unit := limit[len(limit)-1:]
number := limit[:len(limit)-1]
value, err := strconv.ParseInt(number, 10, 64)
if err != nil {
return 0, err
}

switch strings.ToUpper(unit) {
case "g", "G":
return value * 1024 * 1024 * 1024, nil
case "m", "M":
return value * 1024 * 1024, nil
default:
return 0, fmt.Errorf("unknown unit: %s", unit)
}
}
40 changes: 40 additions & 0 deletions shared/utils/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package utils

import log "github.com/sirupsen/logrus"

func FindLimit(x, y int) int {
if x == 0 {
return y
}
if y == 0 {
return x
}
if x < y {
return x
}
return y
}

// Gets two memory limits and returns the smaller one as number of bytes
func FindMemoryLimit(x, y string) int {
// Check the global RAM Limit
var global_ram_limit int64
if x != "" {
bytes, err := ParseMemoryLimit(x)
if err != nil {
log.WithError(err).Errorf("Failed to parse global RAM limit %s", x)
} else {
global_ram_limit = bytes
}
}
var step_ram_limit int64
if y != "" {
bytes, err := ParseMemoryLimit(y)
if err != nil {
log.WithError(err).Errorf("Failed to parse step RAM limit %s", y)
} else {
step_ram_limit = bytes
}
}
return FindLimit(int(step_ram_limit), int(global_ram_limit))
}

0 comments on commit eea0537

Please sign in to comment.