diff --git a/internal/command/create/create.go b/internal/command/create/create.go index 7f795bf9..00132764 100644 --- a/internal/command/create/create.go +++ b/internal/command/create/create.go @@ -10,7 +10,11 @@ func NewCommand() *cobra.Command { Short: "Create resources on the controller", } - command.AddCommand(newCreateVMCommand(), newCreateServiceAccount()) + command.AddCommand( + newCreateVMCommand(), + newCreateServiceAccount(), + newCreateImagePullJob(), + ) return command } diff --git a/internal/command/create/imagepulljob.go b/internal/command/create/imagepulljob.go new file mode 100644 index 00000000..c1cf635d --- /dev/null +++ b/internal/command/create/imagepulljob.go @@ -0,0 +1,56 @@ +package create + +import ( + "fmt" + "os" + + "github.com/cirruslabs/orchard/internal/simplename" + "github.com/cirruslabs/orchard/pkg/client" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/spf13/cobra" +) + +func newCreateImagePullJob() *cobra.Command { + command := &cobra.Command{ + Use: "imagepulljob NAME", + Short: "Create an image pull job", + RunE: runCreateImagePullJob, + Args: cobra.ExactArgs(1), + } + + command.Flags().StringVar(&image, "image", "", + "image to pull") + command.Flags().StringToStringVar(&labels, "labels", map[string]string{}, + "labels required by this image pull job") + + return command +} + +func runCreateImagePullJob(cmd *cobra.Command, args []string) error { + name := args[0] + + // Issue a warning if the name used will be invalid in the future + if err := simplename.ValidateNext(name); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "WARNING: %v\n", err) + } + + // Validate command-line arguments + if image == "" { + return fmt.Errorf("please specify an \"--image\" to pull") + } + + imagePullJob := &v1.ImagePullJob{ + Meta: v1.Meta{ + Name: name, + }, + Image: image, + Labels: labels, + } + + client, err := client.New() + if err != nil { + return err + } + + return client.ImagePullJobs().Create(cmd.Context(), imagePullJob) +} diff --git a/internal/command/deletecmd/delete.go b/internal/command/deletecmd/delete.go index 57afb507..540554b8 100644 --- a/internal/command/deletecmd/delete.go +++ b/internal/command/deletecmd/delete.go @@ -10,7 +10,13 @@ func NewCommand() *cobra.Command { Short: "Delete resources from the controller", } - command.AddCommand(newDeleteVMCommand(), newDeleteServiceComandCommand(), newDeleteWorkerCommand()) + command.AddCommand( + newDeleteVMCommand(), + newDeleteServiceComandCommand(), + newDeleteWorkerCommand(), + newDeleteImagePullCommand(), + newDeleteImagePullJobCommand(), + ) return command } diff --git a/internal/command/deletecmd/imagepull.go b/internal/command/deletecmd/imagepull.go new file mode 100644 index 00000000..4e6bd5a5 --- /dev/null +++ b/internal/command/deletecmd/imagepull.go @@ -0,0 +1,26 @@ +package deletecmd + +import ( + "github.com/cirruslabs/orchard/pkg/client" + "github.com/spf13/cobra" +) + +func newDeleteImagePullCommand() *cobra.Command { + return &cobra.Command{ + Use: "imagepull NAME", + Short: "Delete an image pull", + Args: cobra.ExactArgs(1), + RunE: runDeleteImagePull, + } +} + +func runDeleteImagePull(cmd *cobra.Command, args []string) error { + name := args[0] + + client, err := client.New() + if err != nil { + return err + } + + return client.ImagePulls().Delete(cmd.Context(), name) +} diff --git a/internal/command/deletecmd/imagepulljob.go b/internal/command/deletecmd/imagepulljob.go new file mode 100644 index 00000000..ba04448f --- /dev/null +++ b/internal/command/deletecmd/imagepulljob.go @@ -0,0 +1,26 @@ +package deletecmd + +import ( + "github.com/cirruslabs/orchard/pkg/client" + "github.com/spf13/cobra" +) + +func newDeleteImagePullJobCommand() *cobra.Command { + return &cobra.Command{ + Use: "imagepulljob NAME", + Short: "Delete an image pull job", + Args: cobra.ExactArgs(1), + RunE: runDeleteImagePullJob, + } +} + +func runDeleteImagePullJob(cmd *cobra.Command, args []string) error { + name := args[0] + + client, err := client.New() + if err != nil { + return err + } + + return client.ImagePullJobs().Delete(cmd.Context(), name) +} diff --git a/internal/command/list/imagepulljobs.go b/internal/command/list/imagepulljobs.go new file mode 100644 index 00000000..b54c2604 --- /dev/null +++ b/internal/command/list/imagepulljobs.go @@ -0,0 +1,53 @@ +package list + +import ( + "fmt" + + "github.com/cirruslabs/orchard/pkg/client" + "github.com/gosuri/uitable" + "github.com/spf13/cobra" +) + +func newListImagePullJobsCommand() *cobra.Command { + command := &cobra.Command{ + Use: "imagepulljobs", + Short: "List image pull jobs", + RunE: runListImagePullJobs, + } + + return command +} + +func runListImagePullJobs(cmd *cobra.Command, args []string) error { + client, err := client.New() + if err != nil { + return err + } + + imagePullJobs, err := client.ImagePullJobs().List(cmd.Context()) + if err != nil { + return err + } + + if quiet { + for _, imagePullJob := range imagePullJobs { + fmt.Println(imagePullJob.Name) + } + + return nil + } + + table := uitable.New() + table.Wrap = true + + table.AddRow("Name", "Image", "Labels", "Progressing", "Succeeded", "Failed", "Total") + + for _, imagePullJob := range imagePullJobs { + table.AddRow(imagePullJob.Name, imagePullJob.Image, imagePullJob.Labels, imagePullJob.Progressing, + imagePullJob.Succeeded, imagePullJob.Failed, imagePullJob.Total) + } + + fmt.Println(table) + + return nil +} diff --git a/internal/command/list/imagepulls.go b/internal/command/list/imagepulls.go new file mode 100644 index 00000000..2b5886e5 --- /dev/null +++ b/internal/command/list/imagepulls.go @@ -0,0 +1,54 @@ +package list + +import ( + "fmt" + + "github.com/cirruslabs/orchard/pkg/client" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/gosuri/uitable" + "github.com/spf13/cobra" +) + +func newListImagePullsCommand() *cobra.Command { + command := &cobra.Command{ + Use: "imagepulls", + Short: "List image pulls", + RunE: runListImagePulls, + } + + return command +} + +func runListImagePulls(cmd *cobra.Command, args []string) error { + client, err := client.New() + if err != nil { + return err + } + + imagePulls, err := client.ImagePulls().List(cmd.Context()) + if err != nil { + return err + } + + if quiet { + for _, imagePull := range imagePulls { + fmt.Println(imagePull.Name) + } + + return nil + } + + table := uitable.New() + table.Wrap = true + + table.AddRow("Name", "Image", "Worker", "Conditions") + + for _, imagePullJob := range imagePulls { + table.AddRow(imagePullJob.Name, imagePullJob.Image, imagePullJob.Worker, + v1.ConditionsHumanize(imagePullJob.Conditions)) + } + + fmt.Println(table) + + return nil +} diff --git a/internal/command/list/list.go b/internal/command/list/list.go index 18e05189..635ee09d 100644 --- a/internal/command/list/list.go +++ b/internal/command/list/list.go @@ -12,7 +12,13 @@ func NewCommand() *cobra.Command { Short: "List resources on the controller", } - command.AddCommand(newListWorkersCommand(), newListVMsCommand(), newListServiceAccountsCommand()) + command.AddCommand( + newListWorkersCommand(), + newListVMsCommand(), + newListServiceAccountsCommand(), + newListImagePullsCommand(), + newListImagePullJobsCommand(), + ) command.Flags().BoolVarP(&quiet, "", "q", false, "only show resource names") diff --git a/internal/controller/api.go b/internal/controller/api.go index b8697ca4..e786844e 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -183,6 +183,37 @@ func (controller *Controller) initAPI() *gin.Engine { controller.appendVMEvents(c).Respond(c) }) + // Image pulls + v1.POST("/imagepulls", func(c *gin.Context) { + controller.createImagePull(c).Respond(c) + }) + v1.PUT("/imagepulls/:name/state", func(c *gin.Context) { + controller.updateImagePullState(c).Respond(c) + }) + v1.GET("/imagepulls/:name", func(c *gin.Context) { + controller.getImagePull(c).Respond(c) + }) + v1.GET("/imagepulls", func(c *gin.Context) { + controller.listImagePulls(c).Respond(c) + }) + v1.DELETE("/imagepulls/:name", func(c *gin.Context) { + controller.deleteImagePull(c).Respond(c) + }) + + // Image pull jobs + v1.POST("/imagepulljobs", func(c *gin.Context) { + controller.createImagePullJob(c).Respond(c) + }) + v1.GET("/imagepulljobs/:name", func(c *gin.Context) { + controller.getImagePullJob(c).Respond(c) + }) + v1.GET("/imagepulljobs", func(c *gin.Context) { + controller.listImagePullJobs(c).Respond(c) + }) + v1.DELETE("/imagepulljobs/:name", func(c *gin.Context) { + controller.deleteImagePullJob(c).Respond(c) + }) + return ginEngine } diff --git a/internal/controller/api_controller.go b/internal/controller/api_controller.go index b67ca2a8..9d92d431 100644 --- a/internal/controller/api_controller.go +++ b/internal/controller/api_controller.go @@ -19,6 +19,7 @@ func (controller *Controller) controllerInfo(ctx *gin.Context) responder.Respond capabilities := []v1pkg.ControllerCapability{ v1pkg.ControllerCapabilityRPCV1, v1pkg.ControllerCapabilityVMStateEndpoint, + v1pkg.ControllerCapabilityImagePullResource, } if controller.experimentalRPCV2 { diff --git a/internal/controller/api_imagepulljobs.go b/internal/controller/api_imagepulljobs.go new file mode 100644 index 00000000..c2be77df --- /dev/null +++ b/internal/controller/api_imagepulljobs.go @@ -0,0 +1,124 @@ +package controller + +import ( + "errors" + "net/http" + "time" + + storepkg "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/cirruslabs/orchard/internal/responder" + "github.com/cirruslabs/orchard/internal/simplename" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +func (controller *Controller) createImagePullJob(ctx *gin.Context) responder.Responder { + // Auth + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil { + return responder + } + + // Parse user input + var userPullJob v1.ImagePullJob + + if err := ctx.ShouldBindJSON(&userPullJob); err != nil { + return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided")) + } + + // Validate user input + if userPullJob.Name == "" { + return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("name field cannot be empty")) + } else if err := simplename.Validate(userPullJob.Name); err != nil { + return responder.JSON(http.StatusPreconditionFailed, + NewErrorResponse("name field %v", err)) + } + if userPullJob.Image == "" { + return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("image field cannot be empty")) + } + + // Provide defaults + userPullJob.CreatedAt = time.Now() + userPullJob.UID = uuid.NewString() + + response := controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { + // Does this resource already exists? + _, err := txn.GetImagePullJob(userPullJob.Name) + if err != nil && !errors.Is(err, storepkg.ErrNotFound) { + controller.logger.Errorf("failed to check if the image pull job exists in the DB: %v", err) + + return responder.Code(http.StatusInternalServerError) + } + if err == nil { + return responder.JSON(http.StatusConflict, NewErrorResponse("image pull job with this name "+ + "already exists")) + } + + if err := txn.SetImagePullJob(userPullJob); err != nil { + controller.logger.Errorf("failed to create image pull job in the DB: %v", err) + + return responder.Code(http.StatusInternalServerError) + } + + return responder.JSON(http.StatusOK, &userPullJob) + }) + + return response +} + +func (controller *Controller) getImagePullJob(ctx *gin.Context) responder.Responder { + // Auth + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil { + return responder + } + + name := ctx.Param("name") + + return controller.storeView(func(txn storepkg.Transaction) responder.Responder { + dbPullJob, err := txn.GetImagePullJob(name) + if err != nil { + return responder.Error(err) + } + + return responder.JSON(http.StatusOK, dbPullJob) + }) +} + +func (controller *Controller) listImagePullJobs(ctx *gin.Context) responder.Responder { + // Auth + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil { + return responder + } + + return controller.storeView(func(txn storepkg.Transaction) responder.Responder { + dbPullJobs, err := txn.ListImagePullJobs() + if err != nil { + return responder.Error(err) + } + + return responder.JSON(http.StatusOK, dbPullJobs) + }) +} + +func (controller *Controller) deleteImagePullJob(ctx *gin.Context) responder.Responder { + // Auth + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil { + return responder + } + + name := ctx.Param("name") + + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { + _, err := txn.GetImagePullJob(name) + if err != nil { + return responder.Error(err) + } + + err = txn.DeleteImagePullJob(name) + if err != nil { + return responder.Error(err) + } + + return responder.Code(http.StatusOK) + }) +} diff --git a/internal/controller/api_imagepulls.go b/internal/controller/api_imagepulls.go new file mode 100644 index 00000000..6dba9c5f --- /dev/null +++ b/internal/controller/api_imagepulls.go @@ -0,0 +1,159 @@ +package controller + +import ( + "errors" + "net/http" + "time" + + storepkg "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/cirruslabs/orchard/internal/responder" + "github.com/cirruslabs/orchard/internal/simplename" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +func (controller *Controller) createImagePull(ctx *gin.Context) responder.Responder { + // Auth + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil { + return responder + } + + // Parse user input + var userPull v1.ImagePull + + if err := ctx.ShouldBindJSON(&userPull); err != nil { + return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided")) + } + + // Validate user input + if userPull.Name == "" { + return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("name field cannot is empty")) + } else if err := simplename.Validate(userPull.Name); err != nil { + return responder.JSON(http.StatusPreconditionFailed, + NewErrorResponse("name field %v", err)) + } + if userPull.Image == "" { + return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("image field cannot be empty")) + } + if userPull.Worker == "" { + return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("worker field cannot be empty")) + } + + // Provide defaults + userPull.CreatedAt = time.Now() + userPull.UID = uuid.NewString() + + response := controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { + // Does this resource already exists? + _, err := txn.GetImagePull(userPull.Name) + if err != nil && !errors.Is(err, storepkg.ErrNotFound) { + controller.logger.Errorf("failed to check if the image pull exists in the DB: %v", err) + + return responder.Code(http.StatusInternalServerError) + } + if err == nil { + return responder.JSON(http.StatusConflict, NewErrorResponse("image pull with this name already exists")) + } + + if err := txn.SetImagePull(userPull); err != nil { + controller.logger.Errorf("failed to create image pull in the DB: %v", err) + + return responder.Code(http.StatusInternalServerError) + } + + return responder.JSON(http.StatusOK, &userPull) + }) + + return response +} + +func (controller *Controller) updateImagePullState(ctx *gin.Context) responder.Responder { + // Auth + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil { + return responder + } + + // Parse user input + var userPull v1.ImagePull + + if err := ctx.ShouldBindJSON(&userPull); err != nil { + return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided")) + } + + name := ctx.Param("name") + + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { + dbPull, err := txn.GetImagePull(name) + if err != nil { + return responder.Error(err) + } + + dbPull.PullState = userPull.PullState + + if err := txn.SetImagePull(*dbPull); err != nil { + controller.logger.Errorf("failed to update image pull in the DB: %v", err) + + return responder.Code(http.StatusInternalServerError) + } + + return responder.JSON(http.StatusOK, dbPull) + }) +} + +func (controller *Controller) getImagePull(ctx *gin.Context) responder.Responder { + // Auth + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil { + return responder + } + + name := ctx.Param("name") + + return controller.storeView(func(txn storepkg.Transaction) responder.Responder { + dbPull, err := txn.GetImagePull(name) + if err != nil { + return responder.Error(err) + } + + return responder.JSON(http.StatusOK, dbPull) + }) +} + +func (controller *Controller) listImagePulls(ctx *gin.Context) responder.Responder { + // Auth + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil { + return responder + } + + return controller.storeView(func(txn storepkg.Transaction) responder.Responder { + dbPulls, err := txn.ListImagePulls() + if err != nil { + return responder.Error(err) + } + + return responder.JSON(http.StatusOK, dbPulls) + }) +} + +func (controller *Controller) deleteImagePull(ctx *gin.Context) responder.Responder { + // Auth + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil { + return responder + } + + name := ctx.Param("name") + + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { + _, err := txn.GetImagePull(name) + if err != nil { + return responder.Error(err) + } + + err = txn.DeleteImagePull(name) + if err != nil { + return responder.Error(err) + } + + return responder.Code(http.StatusOK) + }) +} diff --git a/internal/controller/scheduler/imagepull.go b/internal/controller/scheduler/imagepull.go new file mode 100644 index 00000000..195113fa --- /dev/null +++ b/internal/controller/scheduler/imagepull.go @@ -0,0 +1,270 @@ +package scheduler + +import ( + "errors" + "fmt" + "time" + + storepkg "github.com/cirruslabs/orchard/internal/controller/store" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "github.com/samber/lo" +) + +func (scheduler *Scheduler) imagePullLoopIteration() error { + // Get a lagging view of image pulls, image pull jobs and workers + var imagePulls []v1.ImagePull + var imagePullJobs []v1.ImagePullJob + var workers []v1.Worker + + if err := scheduler.store.View(func(txn storepkg.Transaction) error { + var err error + + imagePulls, err = txn.ListImagePulls() + if err != nil { + return err + } + + imagePullJobs, err = txn.ListImagePullJobs() + if err != nil { + return err + } + + workers, err = txn.ListWorkers() + if err != nil { + return err + } + + return nil + }); err != nil { + return err + } + + scheduler.logger.Debugf("processing %d image pull jobs, %d image pulls and %d workers", + len(imagePullJobs), len(imagePulls), len(workers)) + + // Schedule new image pulls and update image pull job states + imagePullJobIndex := map[string]v1.ImagePullJob{} + + for _, imagePullJob := range imagePullJobs { + imagePullJobIndex[imagePullJob.Name] = imagePullJob + + // Schedule new image pulls + existingImagePulls := lo.Filter(imagePulls, func(imagePull v1.ImagePull, _ int) bool { + return lo.ContainsBy(imagePull.OwnerReferences, func(ownerReference v1.OwnerReference) bool { + return ownerReference == imagePullJob.OwnerReference() + }) + }) + + for _, worker := range workers { + // Should we create an image pull for this worker? + if !worker.Labels.Contains(imagePullJob.Labels) { + continue + } + + // Have we already created an image pull for this worker? + if _, ok := lo.Find(existingImagePulls, func(imagePull v1.ImagePull) bool { + return imagePull.Worker == worker.Name + }); ok { + continue + } + + // Create an image pull for this worker + scheduler.logger.Debugf("creating image pull for job %s and worker %s", + imagePullJob.Name, worker.Name) + + newImagePull, err := scheduler.createImagePull(imagePullJob, worker) + if err != nil { + return err + } + + existingImagePulls = append(existingImagePulls, *newImagePull) + } + + // Craft the current image pull job state + newImagePullJobState := v1.ImagePullJobState{ + Total: int64(len(existingImagePulls)), + Progressing: int64(lo.CountBy(existingImagePulls, func(imagePull v1.ImagePull) bool { + return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeProgressing) + })), + Succeeded: int64(lo.CountBy(existingImagePulls, func(imagePull v1.ImagePull) bool { + return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeCompleted) + })), + Failed: int64(lo.CountBy(existingImagePulls, func(imagePull v1.ImagePull) bool { + return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeFailed) + })), + } + + if newImagePullJobState.Progressing != 0 { + newImagePullJobState.Conditions = append(newImagePullJobState.Conditions, v1.Condition{ + Type: v1.ConditionTypeProgressing, + State: v1.ConditionStateTrue, + }) + } + + if newImagePullJobState.Failed != 0 { + newImagePullJobState.Conditions = append(newImagePullJobState.Conditions, v1.Condition{ + Type: v1.ConditionTypeFailed, + State: v1.ConditionStateTrue, + }) + } else if (newImagePullJobState.Succeeded + newImagePullJobState.Failed) == newImagePullJobState.Total { + newImagePullJobState.Conditions = append(newImagePullJobState.Conditions, v1.Condition{ + Type: v1.ConditionTypeCompleted, + State: v1.ConditionStateTrue, + }) + } + + // Is the current image pull job state any different? + if cmp.Equal(imagePullJob.ImagePullJobState, newImagePullJobState) { + continue + } + + // Update image pull job state + if err := scheduler.store.Update(func(txn storepkg.Transaction) error { + dbPullJob, err := txn.GetImagePullJob(imagePullJob.Name) + if err != nil { + // Is the image pull job still exists? + if errors.Is(err, storepkg.ErrNotFound) { + return nil + } + + return err + } + + // Is it the same image pull job? + if dbPullJob.UID != imagePullJob.UID { + return nil + } + + dbPullJob.ImagePullJobState = newImagePullJobState + + return txn.SetImagePullJob(*dbPullJob) + }); err != nil { + return err + } + } + + // Garbage collect orphaned image pulls + for _, imagePull := range imagePulls { + // Is this image pull controlled by an image pull job? + imagePullJobOwnerReferences := lo.Filter(imagePull.OwnerReferences, func(ownerReference v1.OwnerReference, _ int) bool { + return ownerReference.Kind == v1.KindImagePullJob + }) + + if len(imagePullJobOwnerReferences) == 0 { + continue + } + + // Does this image pull has any invalid references? + hasOrphanedPullJobOwnerReferences := lo.ContainsBy(imagePullJobOwnerReferences, func(ownerReference v1.OwnerReference) bool { + imagePullJob, ok := imagePullJobIndex[ownerReference.Name] + if !ok { + return true + } + + return ownerReference != imagePullJob.OwnerReference() + }) + + if !hasOrphanedPullJobOwnerReferences { + continue + } + + scheduler.logger.Debugf("removing image pull %s with non-existent owner reference", imagePull.Name) + + if err := scheduler.store.Update(func(txn storepkg.Transaction) error { + // Is this image pull still exists? + dbImagePull, err := txn.GetImagePull(imagePull.Name) + if err != nil { + if errors.Is(err, storepkg.ErrNotFound) { + scheduler.logger.Warnf("image pull %s is gone, perhaps the user "+ + "manually deleted it?", imagePull.Name) + return nil + } + + return err + } + + // Is this the same image pull? + if imagePull.UID != dbImagePull.UID { + return nil + } + + return txn.DeleteImagePull(imagePull.Name) + }); err != nil { + return err + } + } + + return nil +} + +func (scheduler *Scheduler) createImagePull(imagePullJob v1.ImagePullJob, worker v1.Worker) (*v1.ImagePull, error) { + var imagePull v1.ImagePull + + if err := scheduler.store.Update(func(txn storepkg.Transaction) error { + dbPullJob, err := txn.GetImagePullJob(imagePullJob.Name) + if err != nil { + // Is the image pull job still exists? + if errors.Is(err, storepkg.ErrNotFound) { + return nil + } + + return err + } + + // Is it the same pull job? + if dbPullJob.UID != imagePullJob.UID { + return nil + } + + dbWorker, err := txn.GetWorker(worker.Name) + if err != nil { + // Is the worker still exists? + if errors.Is(err, storepkg.ErrNotFound) { + return nil + } + + return err + } + + // Do the worker labels still match? + if !dbWorker.Labels.Contains(dbPullJob.Labels) { + return nil + } + + imagePull = v1.ImagePull{ + Meta: v1.Meta{ + Name: fmt.Sprintf("%s-%s", dbPullJob.Name, dbWorker.Name), + CreatedAt: time.Now(), + }, + UID: uuid.NewString(), + OwnerReferences: []v1.OwnerReference{ + dbPullJob.OwnerReference(), + }, + Image: dbPullJob.Image, + Worker: worker.Name, + } + + _, err = txn.GetImagePull(imagePull.Name) + if err != nil && !errors.Is(err, storepkg.ErrNotFound) { + return err + } + if !errors.Is(err, storepkg.ErrNotFound) { + scheduler.logger.Warnf("image pull %s already exists, perhaps the user "+ + "manually created it?", imagePull.Name) + + return nil + } + + if err := txn.SetImagePull(imagePull); err != nil { + return err + } + + return nil + }); err != nil { + return nil, err + } + + return &imagePull, nil +} diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index 4ac1f3d7..2c3d1abb 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -99,16 +99,24 @@ func (scheduler *Scheduler) Run() { scheduler.logger.Errorf("Failed to health-check VMs: %v", err) } + imagePullLoopIterationStart := time.Now() + err = scheduler.imagePullLoopIteration() + imagePullLoopIterationEnd := time.Now() + if err != nil { + scheduler.logger.Errorf("Failed to process image pulls and image pull jobs: %v", err) + } + schedulingLoopIterationStart := time.Now() numWorkersScheduling, numVMsScheduling, err := scheduler.schedulingLoopIteration() schedulingLoopIterationEnd := time.Now() scheduler.logger.Debugf("Health checking loop iteration for %d workers and %d VMs took %v, "+ - "scheduling loop iteration for %d workers and %d VMs took %v", + "scheduling loop iteration for %d workers and %d VMs took %v, image pull loop iteration took %v", numWorkersHealth, numVMsHealth, healthCheckingLoopIterationEnd.Sub(healthCheckingLoopIterationStart), numWorkersScheduling, numVMsScheduling, - schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart)) + schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart), + imagePullLoopIterationEnd.Sub(imagePullLoopIterationStart)) if err != nil { scheduler.logger.Errorf("Failed to schedule VMs: %v", err) diff --git a/internal/controller/store/badger/badger_pull.go b/internal/controller/store/badger/badger_pull.go new file mode 100644 index 00000000..19ab01b5 --- /dev/null +++ b/internal/controller/store/badger/badger_pull.go @@ -0,0 +1,30 @@ +//nolint:dupl // maybe we'll figure out how to make DB resource accessors generic in the future +package badger + +import ( + "path" + + "github.com/cirruslabs/orchard/pkg/resource/v1" +) + +const SpaceImagePulls = "/imagepulls" + +func ImagePullKey(name string) []byte { + return []byte(path.Join(SpaceImagePulls, name)) +} + +func (txn *Transaction) GetImagePull(name string) (*v1.ImagePull, error) { + return genericGet[v1.ImagePull](txn, ImagePullKey(name)) +} + +func (txn *Transaction) SetImagePull(pull v1.ImagePull) error { + return genericSet[v1.ImagePull](txn, ImagePullKey(pull.Name), pull) +} + +func (txn *Transaction) DeleteImagePull(name string) error { + return genericDelete(txn, ImagePullKey(name)) +} + +func (txn *Transaction) ListImagePulls() ([]v1.ImagePull, error) { + return genericList[v1.ImagePull](txn, []byte(SpaceImagePulls)) +} diff --git a/internal/controller/store/badger/badger_pulljob.go b/internal/controller/store/badger/badger_pulljob.go new file mode 100644 index 00000000..24b5a70b --- /dev/null +++ b/internal/controller/store/badger/badger_pulljob.go @@ -0,0 +1,30 @@ +//nolint:dupl // maybe we'll figure out how to make DB resource accessors generic in the future +package badger + +import ( + "path" + + "github.com/cirruslabs/orchard/pkg/resource/v1" +) + +const SpaceImagePullJobs = "/imagepulljobs" + +func ImagePullJobKey(name string) []byte { + return []byte(path.Join(SpaceImagePullJobs, name)) +} + +func (txn *Transaction) GetImagePullJob(name string) (*v1.ImagePullJob, error) { + return genericGet[v1.ImagePullJob](txn, ImagePullJobKey(name)) +} + +func (txn *Transaction) SetImagePullJob(pull v1.ImagePullJob) error { + return genericSet[v1.ImagePullJob](txn, ImagePullJobKey(pull.Name), pull) +} + +func (txn *Transaction) DeleteImagePullJob(name string) error { + return genericDelete(txn, ImagePullJobKey(name)) +} + +func (txn *Transaction) ListImagePullJobs() ([]v1.ImagePullJob, error) { + return genericList[v1.ImagePullJob](txn, []byte(SpaceImagePullJobs)) +} diff --git a/internal/controller/store/store.go b/internal/controller/store/store.go index c62f6e2e..88a3b2fa 100644 --- a/internal/controller/store/store.go +++ b/internal/controller/store/store.go @@ -47,4 +47,14 @@ type Transaction interface { GetClusterSettings() (*v1.ClusterSettings, error) SetClusterSettings(clusterSettings v1.ClusterSettings) error + + GetImagePull(name string) (result *v1.ImagePull, err error) + SetImagePull(pull v1.ImagePull) (err error) + DeleteImagePull(name string) (err error) + ListImagePulls() (result []v1.ImagePull, err error) + + GetImagePullJob(name string) (result *v1.ImagePullJob, err error) + SetImagePullJob(pull v1.ImagePullJob) (err error) + DeleteImagePullJob(name string) (err error) + ListImagePullJobs() (result []v1.ImagePullJob, err error) } diff --git a/internal/tests/imagepull_test.go b/internal/tests/imagepull_test.go new file mode 100644 index 00000000..ff9b8e56 --- /dev/null +++ b/internal/tests/imagepull_test.go @@ -0,0 +1,96 @@ +package tests + +import ( + "testing" + "time" + + "github.com/cirruslabs/orchard/internal/imageconstant" + "github.com/cirruslabs/orchard/internal/tests/devcontroller" + "github.com/cirruslabs/orchard/internal/tests/wait" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/stretchr/testify/require" +) + +func TestImagePull(t *testing.T) { + devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t) + + // Determine the worker name that we'll target + workers, err := devClient.Workers().List(t.Context()) + require.NoError(t, err) + require.Len(t, workers, 1) + + // Create an image pull + imagePullName := "test" + + err = devClient.ImagePulls().Create(t.Context(), &v1.ImagePull{ + Meta: v1.Meta{ + Name: imagePullName, + }, + Image: imageconstant.DefaultMacosImage, + Worker: workers[0].Name, + }) + require.NoError(t, err) + + // Wait for the image pull to enter terminal state + var imagePull *v1.ImagePull + + require.True(t, wait.Wait(2*time.Minute, func() bool { + imagePull, err = devClient.ImagePulls().Get(t.Context(), imagePullName) + require.NoError(t, err) + + t.Logf("Waiting for the image pull to enter terminal state. Current conditions: %s.", + v1.ConditionsHumanize(imagePull.Conditions)) + + return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeCompleted) || + v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeFailed) + }), "failed to wait for image pull to enter terminal state") + + // Ensure that image pull succeeded + require.True(t, v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeCompleted)) +} + +func TestImagePullJob(t *testing.T) { + devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t) + + // Determine the worker name that we'll target + workers, err := devClient.Workers().List(t.Context()) + require.NoError(t, err) + require.Len(t, workers, 1) + + // Create an image pull job + imagePullJobName := "test" + + err = devClient.ImagePullJobs().Create(t.Context(), &v1.ImagePullJob{ + Meta: v1.Meta{ + Name: imagePullJobName, + }, + Image: imageconstant.DefaultMacosImage, + }) + require.NoError(t, err) + + // Wait for the image pull job to be completed + var imagePullJob *v1.ImagePullJob + + require.True(t, wait.Wait(2*time.Minute, func() bool { + imagePullJob, err = devClient.ImagePullJobs().Get(t.Context(), imagePullJobName) + require.NoError(t, err) + + t.Logf("Waiting for the image pull job to enter terminal state. Current conditions: %s.", + v1.ConditionsHumanize(imagePullJob.Conditions)) + + return v1.ConditionIsTrue(imagePullJob.Conditions, v1.ConditionTypeCompleted) || + v1.ConditionIsTrue(imagePullJob.Conditions, v1.ConditionTypeFailed) + }), "failed to wait for image pull to enter terminal state") + + // Ensure that image pull had succeeded + require.Equal(t, []v1.Condition{ + { + Type: v1.ConditionTypeCompleted, + State: v1.ConditionStateTrue, + }, + }, imagePullJob.Conditions) + require.EqualValues(t, 0, imagePullJob.Progressing) + require.EqualValues(t, 1, imagePullJob.Succeeded) + require.EqualValues(t, 0, imagePullJob.Failed) + require.EqualValues(t, 1, imagePullJob.Total) +} diff --git a/internal/worker/imagepull.go b/internal/worker/imagepull.go new file mode 100644 index 00000000..20208f6c --- /dev/null +++ b/internal/worker/imagepull.go @@ -0,0 +1,167 @@ +package worker + +import ( + "context" + "errors" + + "github.com/cirruslabs/orchard/internal/worker/tart" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + mapset "github.com/deckarep/golang-set/v2" + "github.com/samber/mo" +) + +type ImagePull struct { + Cancel context.CancelFunc +} + +func (worker *Worker) syncPulls(ctx context.Context) error { + allKeys := mapset.NewSet[string]() + + remotePulls, err := worker.client.ImagePulls().FindForWorker(ctx, worker.name) + if err != nil { + return err + } + + remotePullsIndex := map[string]*v1.ImagePull{} + for _, remotePull := range remotePulls { + // A copy is needed to not reference the loop variable + remotePullCopy := remotePull + + remotePullsIndex[remotePull.Name] = &remotePullCopy + allKeys.Add(remotePull.Name) + } + + localPullsIndex := map[string]*ImagePull{} + for key, localPull := range worker.imagePulls { + localPullsIndex[key] = localPull + allKeys.Add(key) + } + + worker.logger.Infof("syncing %d local image pulls against %d remote image pulls...", + len(localPullsIndex), len(remotePullsIndex)) + + for key := range allKeys.Iter() { + remotePull := mo.PointerToOption(remotePullsIndex[key]) + localPull := mo.PointerToOption(localPullsIndex[key]) + + switch { + case remotePull.IsSome() && localPull.IsNone(): + // No need to do anything about the image pull in terminal state + remotePullConditions := remotePull.MustGet().Conditions + + if v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeCompleted) || + v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeFailed) { + continue + } + + // Image pull exists remotely, but not locally, + // create and start a new local pull + pullCtx, pullCtxCancel := context.WithCancel(ctx) + + newLocalPull := &ImagePull{ + Cancel: pullCtxCancel, + } + + go func() { + defer pullCtxCancel() + + worker.performPull(pullCtx, key, remotePull.MustGet().Image) + }() + + worker.imagePulls[key] = newLocalPull + case remotePull.IsNone() && localPull.IsSome(): + // Pull exists locally, but not remotely, + // terminate and delete the local pull + localPull.MustGet().Cancel() + delete(worker.imagePulls, key) + case remotePull.IsSome() && localPull.IsSome(): + // Terminate local pull when remote pull enters terminal state + remotePullConditions := remotePull.MustGet().Conditions + + if !v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeCompleted) && + !v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeFailed) { + continue + } + + localPull.MustGet().Cancel() + delete(worker.imagePulls, key) + } + } + + return nil +} + +func (worker *Worker) performPull(ctx context.Context, name string, image string) { + _, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{ + Meta: v1.Meta{ + Name: name, + }, + PullState: v1.PullState{ + Conditions: []v1.Condition{ + { + Type: v1.ConditionTypeProgressing, + State: v1.ConditionStateTrue, + }, + }, + }, + }) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + + worker.logger.Errorf("failed to update image pull state: %v", err) + + return + } + + _, _, err = tart.Tart(ctx, worker.logger, "pull", image) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + + worker.logger.Errorf("failed to pull image %s: %v", image, err) + + _, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{ + Meta: v1.Meta{ + Name: name, + }, + PullState: v1.PullState{ + Conditions: []v1.Condition{ + { + Type: v1.ConditionTypeFailed, + State: v1.ConditionStateTrue, + }, + }, + }, + }) + if err != nil { + worker.logger.Errorf("failed to update image pull state: %v", err) + } + + return + } + + if _, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{ + Meta: v1.Meta{ + Name: name, + }, + PullState: v1.PullState{ + Conditions: []v1.Condition{ + { + Type: v1.ConditionTypeCompleted, + State: v1.ConditionStateTrue, + }, + }, + }, + }); err != nil { + if errors.Is(err, context.Canceled) { + return + } + + worker.logger.Errorf("failed to update image pull state: %v", err) + + return + } +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 9c90ac5c..2f480880 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -51,6 +51,8 @@ type Worker struct { localNetworkHelper *localnetworkhelper.LocalNetworkHelper + imagePulls map[string]*ImagePull + logger *zap.SugaredLogger } @@ -60,6 +62,7 @@ func New(client *client.Client, opts ...Option) (*Worker, error) { pollTicker: time.NewTicker(pollInterval), vmm: vmmanager.New(), syncRequested: make(chan bool, 1), + imagePulls: map[string]*ImagePull{}, } // Apply options @@ -215,6 +218,14 @@ func (worker *Worker) runNewSession(ctx context.Context) error { return nil } + if info.Capabilities.Has(v1.ControllerCapabilityImagePullResource) { + if err := worker.syncPulls(subCtx); err != nil { + worker.logger.Warnf("failed to sync image pulls: %v", err) + + return nil + } + } + select { case <-worker.syncRequested: case <-worker.pollTicker.C: diff --git a/pkg/client/client.go b/pkg/client/client.go index f7f6315c..5930afbf 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -349,6 +349,18 @@ func (client *Client) ClusterSettings() *ClusterSettingsService { } } +func (client *Client) ImagePulls() *ImagePullsService { + return &ImagePullsService{ + client: client, + } +} + +func (client *Client) ImagePullJobs() *ImagePullJobsService { + return &ImagePullJobsService{ + client: client, + } +} + func (client *Client) RPC() *RPCService { return &RPCService{ client: client, diff --git a/pkg/client/imagepulljobs.go b/pkg/client/imagepulljobs.go new file mode 100644 index 00000000..066ce5a2 --- /dev/null +++ b/pkg/client/imagepulljobs.go @@ -0,0 +1,82 @@ +package client + +import ( + "context" + "fmt" + "net/http" + "net/url" + + "github.com/cirruslabs/orchard/pkg/resource/v1" +) + +const imagePullJobsEndpointPrefix = "imagepulljobs" + +type ImagePullJobsService struct { + client *Client +} + +func (service *ImagePullJobsService) Create(ctx context.Context, pullJob *v1.ImagePullJob) error { + err := service.client.request(ctx, http.MethodPost, imagePullJobsEndpointPrefix, pullJob, nil, nil) + if err != nil { + return err + } + + return nil +} + +func (service *ImagePullJobsService) List(ctx context.Context) ([]v1.ImagePullJob, error) { + var pullJobs []v1.ImagePullJob + + err := service.client.request(ctx, http.MethodGet, imagePullJobsEndpointPrefix, nil, &pullJobs, nil) + if err != nil { + return nil, err + } + + return pullJobs, nil +} + +func (service *ImagePullJobsService) Get(ctx context.Context, name string) (*v1.ImagePullJob, error) { + var pullJob v1.ImagePullJob + + err := service.client.request(ctx, http.MethodGet, fmt.Sprintf("%s/%s", imagePullJobsEndpointPrefix, + url.PathEscape(name)), nil, &pullJob, nil) + if err != nil { + return nil, err + } + + return &pullJob, nil +} + +func (service *ImagePullJobsService) Update(ctx context.Context, pull v1.ImagePullJob) (*v1.ImagePullJob, error) { + var updatedPullJob v1.ImagePullJob + + err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("%s/%s", imagePullJobsEndpointPrefix, + url.PathEscape(pull.Name)), pull, &updatedPullJob, nil) + if err != nil { + return &updatedPullJob, err + } + + return &updatedPullJob, nil +} + +func (service *ImagePullJobsService) UpdateState(ctx context.Context, pull v1.ImagePullJob) (*v1.ImagePullJob, error) { + var updatedPullJob v1.ImagePullJob + + err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("%s/%s/state", imagePullJobsEndpointPrefix, + url.PathEscape(pull.Name)), pull, &updatedPullJob, nil) + if err != nil { + return &updatedPullJob, err + } + + return &updatedPullJob, nil +} + +func (service *ImagePullJobsService) Delete(ctx context.Context, name string) error { + err := service.client.request(ctx, http.MethodDelete, fmt.Sprintf("%s/%s", imagePullJobsEndpointPrefix, + url.PathEscape(name)), nil, nil, nil) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/client/imagepulls.go b/pkg/client/imagepulls.go new file mode 100644 index 00000000..7e74a24c --- /dev/null +++ b/pkg/client/imagepulls.go @@ -0,0 +1,101 @@ +package client + +import ( + "context" + "fmt" + "net/http" + "net/url" + + "github.com/cirruslabs/orchard/pkg/resource/v1" +) + +const pullsEndpointPrefix = "imagepulls" + +type ImagePullsService struct { + client *Client +} + +func (service *ImagePullsService) Create(ctx context.Context, pull *v1.ImagePull) error { + err := service.client.request(ctx, http.MethodPost, pullsEndpointPrefix, pull, nil, nil) + if err != nil { + return err + } + + return nil +} + +func (service *ImagePullsService) FindForWorker(ctx context.Context, worker string) ([]v1.ImagePull, error) { + allPulls, err := service.List(ctx) + if err != nil { + return nil, err + } + + var result []v1.ImagePull + + for _, pull := range allPulls { + if pull.Worker != worker { + continue + } + + result = append(result, pull) + } + + return result, nil +} + +func (service *ImagePullsService) List(ctx context.Context) ([]v1.ImagePull, error) { + var pulls []v1.ImagePull + + err := service.client.request(ctx, http.MethodGet, pullsEndpointPrefix, nil, &pulls, nil) + if err != nil { + return nil, err + } + + return pulls, nil +} + +func (service *ImagePullsService) Get(ctx context.Context, name string) (*v1.ImagePull, error) { + var pull v1.ImagePull + + err := service.client.request(ctx, http.MethodGet, fmt.Sprintf("%s/%s", pullsEndpointPrefix, + url.PathEscape(name)), nil, &pull, nil) + if err != nil { + return nil, err + } + + return &pull, nil +} + +func (service *ImagePullsService) Update(ctx context.Context, pull v1.ImagePull) (*v1.ImagePull, error) { + var updatedPull v1.ImagePull + + err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("%s/%s", pullsEndpointPrefix, + url.PathEscape(pull.Name)), pull, &updatedPull, nil) + if err != nil { + return &updatedPull, err + } + + return &updatedPull, nil +} + +func (service *ImagePullsService) UpdateState(ctx context.Context, pull v1.ImagePull) (*v1.ImagePull, error) { + var updatedPull v1.ImagePull + + err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("%s/%s/state", pullsEndpointPrefix, + url.PathEscape(pull.Name)), pull, &updatedPull, nil) + if err != nil { + return &updatedPull, err + } + + return &updatedPull, nil +} + +func (service *ImagePullsService) Delete(ctx context.Context, name string) error { + err := service.client.request(ctx, http.MethodDelete, fmt.Sprintf("%s/%s", pullsEndpointPrefix, + url.PathEscape(name)), nil, nil, nil) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/resource/v1/imagepull.go b/pkg/resource/v1/imagepull.go new file mode 100644 index 00000000..68bbcef7 --- /dev/null +++ b/pkg/resource/v1/imagepull.go @@ -0,0 +1,26 @@ +package v1 + +type ImagePull struct { + Meta + + // UID is a useful field for avoiding data races within a single Name. + // + // It is populated by the Controller when receiving a POST request. + UID string `json:"uid,omitempty"` + + OwnerReferences []OwnerReference `json:"ownerReferences,omitempty"` + + Image string `json:"image,omitempty"` + + Worker string `json:"worker,omitempty"` + + PullState +} + +func (pull *ImagePull) SetVersion(version uint64) { + pull.Version = version +} + +type PullState struct { + Conditions []Condition `json:"conditions,omitempty"` +} diff --git a/pkg/resource/v1/imagepulljob.go b/pkg/resource/v1/imagepulljob.go new file mode 100644 index 00000000..673f8c31 --- /dev/null +++ b/pkg/resource/v1/imagepulljob.go @@ -0,0 +1,36 @@ +package v1 + +type ImagePullJob struct { + Meta + + // UID is a useful field for avoiding data races within a single Name. + // + // It is populated by the Controller when receiving a POST request. + UID string `json:"uid,omitempty"` + + Image string `json:"image,omitempty"` + + Labels Labels `json:"labels,omitempty"` + + ImagePullJobState +} + +func (imagePullJob *ImagePullJob) SetVersion(version uint64) { + imagePullJob.Version = version +} + +func (imagePullJob *ImagePullJob) OwnerReference() OwnerReference { + return OwnerReference{ + Kind: KindImagePullJob, + Name: imagePullJob.Name, + UID: imagePullJob.UID, + } +} + +type ImagePullJobState struct { + Conditions []Condition `json:"conditions,omitempty"` + Progressing int64 `json:"progressing,omitempty"` + Succeeded int64 `json:"succeeded,omitempty"` + Failed int64 `json:"failed,omitempty"` + Total int64 `json:"total,omitempty"` +} diff --git a/pkg/resource/v1/kind.go b/pkg/resource/v1/kind.go new file mode 100644 index 00000000..fa1a693b --- /dev/null +++ b/pkg/resource/v1/kind.go @@ -0,0 +1,7 @@ +package v1 + +type Kind string + +const ( + KindImagePullJob Kind = "ImagePullJob" +) diff --git a/pkg/resource/v1/labels.go b/pkg/resource/v1/labels.go index ce0effc5..aafc966a 100644 --- a/pkg/resource/v1/labels.go +++ b/pkg/resource/v1/labels.go @@ -1,5 +1,10 @@ package v1 +import ( + "fmt" + "strings" +) + type Labels map[string]string func (labels Labels) Contains(other Labels) bool { @@ -11,3 +16,13 @@ func (labels Labels) Contains(other Labels) bool { return true } + +func (labels Labels) String() string { + var kvs []string + + for key, value := range labels { + kvs = append(kvs, fmt.Sprintf("%s: %s", key, value)) + } + + return strings.Join(kvs, ", ") +} diff --git a/pkg/resource/v1/v1.go b/pkg/resource/v1/v1.go index 2fe5b948..e176d35a 100644 --- a/pkg/resource/v1/v1.go +++ b/pkg/resource/v1/v1.go @@ -21,6 +21,17 @@ type Meta struct { Version uint64 `json:"version,omitempty"` } +type OwnerReference struct { + Kind Kind `json:"kind,omitempty"` + + Name string `json:"name,omitempty"` + + // UID is a useful field for avoiding data races within a single Name. + // + // It is populated by the Controller when receiving a POST request. + UID string `json:"uid,omitempty"` +} + type VM struct { Image string `json:"image,omitempty"` ImagePullPolicy ImagePullPolicy `json:"imagePullPolicy,omitempty"` @@ -195,9 +206,10 @@ const ( type ControllerCapability string const ( - ControllerCapabilityRPCV1 ControllerCapability = "rpc-v1" - ControllerCapabilityRPCV2 ControllerCapability = "rpc-v2" - ControllerCapabilityVMStateEndpoint ControllerCapability = "vm-state-endpoint" + ControllerCapabilityRPCV1 ControllerCapability = "rpc-v1" + ControllerCapabilityRPCV2 ControllerCapability = "rpc-v2" + ControllerCapabilityVMStateEndpoint ControllerCapability = "vm-state-endpoint" + ControllerCapabilityImagePullResource ControllerCapability = "pull-resource" ) type ControllerCapabilities []ControllerCapability diff --git a/pkg/resource/v1/vm_condition.go b/pkg/resource/v1/vm_condition.go index fe818e6f..f646159b 100644 --- a/pkg/resource/v1/vm_condition.go +++ b/pkg/resource/v1/vm_condition.go @@ -13,12 +13,19 @@ type Condition struct { type ConditionType string const ( + // VM conditions ConditionTypeScheduled ConditionType = "scheduled" ConditionTypeRunning ConditionType = "running" + // VM conditions (internal to worker) ConditionTypeCloning ConditionType = "cloning" ConditionTypeSuspending ConditionType = "suspending" ConditionTypeStopping ConditionType = "stopping" + + // ImagePull and ImagePullJob conditions + ConditionTypeProgressing ConditionType = "progressing" + ConditionTypeCompleted ConditionType = "completed" + ConditionTypeFailed ConditionType = "failed" ) type ConditionState string