From 9c5b72c2d0b298fc1b6c984e135b74e7a736a7b9 Mon Sep 17 00:00:00 2001 From: Matt Prahl Date: Fri, 31 Jan 2025 15:22:15 -0500 Subject: [PATCH] feat(backend): Allow recurring runs to always use the latest pipeline version (#11560) * Allow recurring runs to always use the latest pipeline version This makes the pipeline_version_id optional and makes the Scheduled Workflow controller levarege the REST API to launch the run rather than rely on compiled Argo Workflows stored in the ScheduledWorkflow object. The previous behavior is preserved if the user is using the v1 API or specifies a pipeline version ID or pipeline spec manifest. Resolves: https://github.com/kubeflow/pipelines/issues/11542 Signed-off-by: mprahl * Add instructions to debug the SWF controller Signed-off-by: mprahl --------- Signed-off-by: mprahl --- .gitignore | 3 + backend/README.md | 33 ++++++++ backend/api/Makefile | 2 + backend/api/README.md | 2 +- backend/api/v1beta1/job.proto | 4 +- .../api/v2beta1/go_client/recurring_run.pb.go | 2 +- backend/api/v2beta1/go_client/run.pb.go | 4 +- .../v2beta1_pipeline_version_reference.go | 2 +- .../v2beta1_pipeline_version_reference.go | 2 +- .../go_http_client/run_model/v2beta1_run.go | 2 +- backend/api/v2beta1/recurring_run.proto | 2 +- backend/api/v2beta1/run.proto | 4 +- .../swagger/kfp_api_single_file.swagger.json | 4 +- .../swagger/recurring_run.swagger.json | 2 +- backend/api/v2beta1/swagger/run.swagger.json | 4 +- backend/src/apiserver/client/sql.go | 5 +- .../client_manager/client_manager.go | 6 ++ backend/src/apiserver/model/pipeline_spec.go | 3 +- .../apiserver/resource/resource_manager.go | 79 +++++++++++++++--- .../resource/resource_manager_test.go | 41 ++++------ backend/src/apiserver/server/api_converter.go | 12 ++- backend/src/apiserver/server/job_server.go | 13 +++ .../src/apiserver/server/job_server_test.go | 14 +++- .../src/apiserver/template/argo_template.go | 28 ++----- backend/src/apiserver/template/template.go | 2 +- backend/src/apiserver/template/v2_template.go | 73 ++++++++++------- backend/src/common/util/service.go | 11 +++ .../scheduledworkflow/controller.go | 81 ++++++++++++++++++- .../crd/controller/scheduledworkflow/main.go | 53 ++++++++++-- .../util/scheduled_workflow.go | 3 +- .../apis/scheduledworkflow/v1beta1/types.go | 4 + .../v2/integration/recurring_run_api_test.go | 67 +++++++++++++++ .../scheduled-workflow/cluster-role.yaml | 6 ++ ...pipeline-scheduledworkflow-deployment.yaml | 11 +++ .../ml-pipeline-scheduledworkflow-role.yaml | 6 ++ 35 files changed, 459 insertions(+), 131 deletions(-) diff --git a/.gitignore b/.gitignore index f221dec7bb3..95936b34f3d 100644 --- a/.gitignore +++ b/.gitignore @@ -86,5 +86,8 @@ __pycache__ # kfp local execution default directory local_outputs/ +# Ignore the Kind cluster kubeconfig +kubeconfig_dev-pipelines-api + # Ignore debug Driver Dockerfile produced from `make -C backend image_driver_debug` backend/Dockerfile.driver-debug diff --git a/backend/README.md b/backend/README.md index a6c0f9b82c4..2b22485025b 100644 --- a/backend/README.md +++ b/backend/README.md @@ -159,6 +159,39 @@ You can also directly connect to the MariaDB database server with: mysql -h 127.0.0.1 -u root ``` +### Scheduled Workflow Development + +If you also want to run the Scheduled Workflow controller locally, stop the controller on the cluster with: + +```bash +kubectl -n kubeflow scale deployment ml-pipeline-scheduledworkflow --replicas=0 +``` + +Then you may leverage the following sample `.vscode/launch.json` file to run the Scheduled Workflow controller locally: + +```json +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Scheduled Workflow controller (Kind)", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceFolder}/backend/src/crd/controller/scheduledworkflow", + "env": { + "CRON_SCHEDULE_TIMEZONE": "UTC" + }, + "args": [ + "-namespace=kubeflow", + "-kubeconfig=${workspaceFolder}/kubeconfig_dev-pipelines-api", + "-mlPipelineAPIServerName=localhost" + ] + } + ] +} +``` + ### Remote Debug the Driver These instructions assume you are leveraging the Kind cluster in the diff --git a/backend/api/Makefile b/backend/api/Makefile index 4bc5db994e9..9ebc9679ff0 100644 --- a/backend/api/Makefile +++ b/backend/api/Makefile @@ -17,6 +17,8 @@ IMAGE_TAG=kfp-api-generator # Contact chensun or zijianjoy if this remote image needs an update. REMOTE_IMAGE=ghcr.io/kubeflow/kfp-api-generator +# Assume the latest API version by default. +API_VERSION ?= v2beta1 # Keep in sync with the version used in test/release/Dockerfile.release PREBUILT_REMOTE_IMAGE=ghcr.io/kubeflow/kfp-api-generator:1.0 diff --git a/backend/api/README.md b/backend/api/README.md index eb2da5bb600..a5ffb040d4d 100644 --- a/backend/api/README.md +++ b/backend/api/README.md @@ -12,7 +12,7 @@ Tools needed: Set the environment variable `API_VERSION` to the version that you want to generate. We use `v1beta1` as example here. ```bash -export API_VERSION="v1beta1" +export API_VERSION="v2beta1" ``` ## Compiling `.proto` files to Go client and swagger definitions diff --git a/backend/api/v1beta1/job.proto b/backend/api/v1beta1/job.proto index a08a36b22eb..c76b64e9143 100644 --- a/backend/api/v1beta1/job.proto +++ b/backend/api/v1beta1/job.proto @@ -210,9 +210,9 @@ message Job { // Optional input field. Describing the purpose of the job string description = 3; - // Required input field. + // Optional input field. // Describing what the pipeline manifest and parameters to use - // for the scheduled job. + // for the scheduled job. If unset, fetch the pipline_spec at runtime. PipelineSpec pipeline_spec = 4; // Optional input field. Specify which resource this job belongs to. diff --git a/backend/api/v2beta1/go_client/recurring_run.pb.go b/backend/api/v2beta1/go_client/recurring_run.pb.go index 29c1fef7dae..fcc42ce0005 100644 --- a/backend/api/v2beta1/go_client/recurring_run.pb.go +++ b/backend/api/v2beta1/go_client/recurring_run.pb.go @@ -159,7 +159,7 @@ type RecurringRun struct { // Optional input field. Describes the purpose of the recurring run. Description string `protobuf:"bytes,3,opt,name=description,proto3" json:"description,omitempty"` // Required input field. Specifies the source of the pipeline spec for this - // recurring run. Can be either a pipeline version id, or a pipeline spec. + // recurring run. Can be either a pipeline id, pipeline version id, or a pipeline spec. // // Types that are assignable to PipelineSource: // diff --git a/backend/api/v2beta1/go_client/run.pb.go b/backend/api/v2beta1/go_client/run.pb.go index 4713fee5ec9..ac0d74d088b 100644 --- a/backend/api/v2beta1/go_client/run.pb.go +++ b/backend/api/v2beta1/go_client/run.pb.go @@ -411,7 +411,7 @@ type Run_PipelineSpec struct { } type Run_PipelineVersionReference struct { - // Reference to a pipeline version containing pipeline_id and pipeline_version_id. + // Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id. PipelineVersionReference *PipelineVersionReference `protobuf:"bytes,18,opt,name=pipeline_version_reference,json=pipelineVersionReference,proto3,oneof"` } @@ -429,7 +429,7 @@ type PipelineVersionReference struct { // Input. Required. Unique ID of the parent pipeline. PipelineId string `protobuf:"bytes,1,opt,name=pipeline_id,json=pipelineId,proto3" json:"pipeline_id,omitempty"` - // Input. Required. Unique ID of an existing pipeline version. + // Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used. PipelineVersionId string `protobuf:"bytes,2,opt,name=pipeline_version_id,json=pipelineVersionId,proto3" json:"pipeline_version_id,omitempty"` } diff --git a/backend/api/v2beta1/go_http_client/recurring_run_model/v2beta1_pipeline_version_reference.go b/backend/api/v2beta1/go_http_client/recurring_run_model/v2beta1_pipeline_version_reference.go index 6c4adb9d62e..615a11c457e 100644 --- a/backend/api/v2beta1/go_http_client/recurring_run_model/v2beta1_pipeline_version_reference.go +++ b/backend/api/v2beta1/go_http_client/recurring_run_model/v2beta1_pipeline_version_reference.go @@ -18,7 +18,7 @@ type V2beta1PipelineVersionReference struct { // Input. Required. Unique ID of the parent pipeline. PipelineID string `json:"pipeline_id,omitempty"` - // Input. Required. Unique ID of an existing pipeline version. + // Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used. PipelineVersionID string `json:"pipeline_version_id,omitempty"` } diff --git a/backend/api/v2beta1/go_http_client/run_model/v2beta1_pipeline_version_reference.go b/backend/api/v2beta1/go_http_client/run_model/v2beta1_pipeline_version_reference.go index e817451c66c..48f17f81926 100644 --- a/backend/api/v2beta1/go_http_client/run_model/v2beta1_pipeline_version_reference.go +++ b/backend/api/v2beta1/go_http_client/run_model/v2beta1_pipeline_version_reference.go @@ -18,7 +18,7 @@ type V2beta1PipelineVersionReference struct { // Input. Required. Unique ID of the parent pipeline. PipelineID string `json:"pipeline_id,omitempty"` - // Input. Required. Unique ID of an existing pipeline version. + // Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used. PipelineVersionID string `json:"pipeline_version_id,omitempty"` } diff --git a/backend/api/v2beta1/go_http_client/run_model/v2beta1_run.go b/backend/api/v2beta1/go_http_client/run_model/v2beta1_run.go index c7a012c57c6..d232fdd05ca 100644 --- a/backend/api/v2beta1/go_http_client/run_model/v2beta1_run.go +++ b/backend/api/v2beta1/go_http_client/run_model/v2beta1_run.go @@ -49,7 +49,7 @@ type V2beta1Run struct { // This field is Deprecated. The pipeline version id is under pipeline_version_reference for v2. PipelineVersionID string `json:"pipeline_version_id,omitempty"` - // Reference to a pipeline version containing pipeline_id and pipeline_version_id. + // Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id. PipelineVersionReference *V2beta1PipelineVersionReference `json:"pipeline_version_reference,omitempty"` // ID of the recurring run that triggered this run. diff --git a/backend/api/v2beta1/recurring_run.proto b/backend/api/v2beta1/recurring_run.proto index 09cec8e200f..66b810901a2 100644 --- a/backend/api/v2beta1/recurring_run.proto +++ b/backend/api/v2beta1/recurring_run.proto @@ -89,7 +89,7 @@ message RecurringRun { string description = 3; // Required input field. Specifies the source of the pipeline spec for this - // recurring run. Can be either a pipeline version id, or a pipeline spec. + // recurring run. Can be either a pipeline id, pipeline version id, or a pipeline spec. oneof pipeline_source { // This field is Deprecated. The pipeline version id is under pipeline_version_reference for v2. string pipeline_version_id = 4 [deprecated=true]; diff --git a/backend/api/v2beta1/run.proto b/backend/api/v2beta1/run.proto index 040abb4a280..5c48ab19317 100644 --- a/backend/api/v2beta1/run.proto +++ b/backend/api/v2beta1/run.proto @@ -168,7 +168,7 @@ message Run { // Pipeline spec. google.protobuf.Struct pipeline_spec = 7; - // Reference to a pipeline version containing pipeline_id and pipeline_version_id. + // Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id. PipelineVersionReference pipeline_version_reference = 18; } @@ -213,7 +213,7 @@ message PipelineVersionReference { // Input. Required. Unique ID of the parent pipeline. string pipeline_id = 1; - // Input. Required. Unique ID of an existing pipeline version. + // Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used. string pipeline_version_id = 2; } diff --git a/backend/api/v2beta1/swagger/kfp_api_single_file.swagger.json b/backend/api/v2beta1/swagger/kfp_api_single_file.swagger.json index 37ac61bdb21..6fc88e10f4b 100644 --- a/backend/api/v2beta1/swagger/kfp_api_single_file.swagger.json +++ b/backend/api/v2beta1/swagger/kfp_api_single_file.swagger.json @@ -2020,7 +2020,7 @@ }, "pipeline_version_id": { "type": "string", - "description": "Input. Required. Unique ID of an existing pipeline version." + "description": "Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used." } }, "description": "Reference to an existing pipeline version." @@ -2349,7 +2349,7 @@ }, "pipeline_version_reference": { "$ref": "#/definitions/v2beta1PipelineVersionReference", - "description": "Reference to a pipeline version containing pipeline_id and pipeline_version_id." + "description": "Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id." }, "runtime_config": { "$ref": "#/definitions/v2beta1RuntimeConfig", diff --git a/backend/api/v2beta1/swagger/recurring_run.swagger.json b/backend/api/v2beta1/swagger/recurring_run.swagger.json index dfc0a80c19f..48e811bd1b6 100644 --- a/backend/api/v2beta1/swagger/recurring_run.swagger.json +++ b/backend/api/v2beta1/swagger/recurring_run.swagger.json @@ -390,7 +390,7 @@ }, "pipeline_version_id": { "type": "string", - "description": "Input. Required. Unique ID of an existing pipeline version." + "description": "Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used." } }, "description": "Reference to an existing pipeline version." diff --git a/backend/api/v2beta1/swagger/run.swagger.json b/backend/api/v2beta1/swagger/run.swagger.json index b71fd939049..d04bda64213 100644 --- a/backend/api/v2beta1/swagger/run.swagger.json +++ b/backend/api/v2beta1/swagger/run.swagger.json @@ -619,7 +619,7 @@ }, "pipeline_version_id": { "type": "string", - "description": "Input. Required. Unique ID of an existing pipeline version." + "description": "Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used." } }, "description": "Reference to an existing pipeline version." @@ -667,7 +667,7 @@ }, "pipeline_version_reference": { "$ref": "#/definitions/v2beta1PipelineVersionReference", - "description": "Reference to a pipeline version containing pipeline_id and pipeline_version_id." + "description": "Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id." }, "runtime_config": { "$ref": "#/definitions/v2beta1RuntimeConfig", diff --git a/backend/src/apiserver/client/sql.go b/backend/src/apiserver/client/sql.go index 026ef056190..3a111eb3afc 100644 --- a/backend/src/apiserver/client/sql.go +++ b/backend/src/apiserver/client/sql.go @@ -22,8 +22,9 @@ import ( ) const ( - MYSQL_TEXT_FORMAT string = "longtext not null" - MYSQL_EXIST_ERROR string = "database exists" + MYSQL_TEXT_FORMAT string = "longtext not null" + MYSQL_TEXT_FORMAT_NULL string = "longtext" + MYSQL_EXIST_ERROR string = "database exists" PGX_TEXT_FORMAT string = "text" PGX_EXIST_ERROR string = "already exists" diff --git a/backend/src/apiserver/client_manager/client_manager.go b/backend/src/apiserver/client_manager/client_manager.go index 2368666e3ff..9857aac68d5 100644 --- a/backend/src/apiserver/client_manager/client_manager.go +++ b/backend/src/apiserver/client_manager/client_manager.go @@ -345,6 +345,12 @@ func InitDBClient(initConnectionTimeout time.Duration) *storage.DB { if ignoreAlreadyExistError(driverName, response.Error) != nil { glog.Fatalf("Failed to create a foreign key for RunUUID in task table. Error: %s", response.Error) } + + // This is a workaround because AutoMigration does not detect that the column went from not null to nullable. + response = db.Model(&model.Job{}).ModifyColumn("WorkflowSpecManifest", client.MYSQL_TEXT_FORMAT_NULL) + if response.Error != nil { + glog.Fatalf("Failed to make the WorkflowSpecManifest column nullable on jobs. Error: %s", response.Error) + } default: glog.Fatalf("Driver %v is not supported, use \"mysql\" for MySQL, or \"pgx\" for PostgreSQL", driverName) } diff --git a/backend/src/apiserver/model/pipeline_spec.go b/backend/src/apiserver/model/pipeline_spec.go index b0fb4184119..2f92c5c68bd 100644 --- a/backend/src/apiserver/model/pipeline_spec.go +++ b/backend/src/apiserver/model/pipeline_spec.go @@ -33,7 +33,8 @@ type PipelineSpec struct { PipelineSpecManifest string `gorm:"column:PipelineSpecManifest; size:33554432;"` // Argo workflow YAML definition. This is the Argo Spec converted from Pipeline YAML. - WorkflowSpecManifest string `gorm:"column:WorkflowSpecManifest; not null; size:33554432;"` + // This is deprecated. Use the pipeline ID, pipeline version ID, or pipeline spec manifest. + WorkflowSpecManifest string `gorm:"column:WorkflowSpecManifest; size:33554432;"` // Store parameters key-value pairs as serialized string. // This field is only used for V1 API. For V2, use the `Parameters` field in RuntimeConfig. diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index c183aa7799f..6523496e012 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -590,6 +590,12 @@ func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error { default: } + // If the pipeline isn't pinned, skip it. The runs API is used directly by the ScheduledWorkflow controller + // in this case with just the pipeline ID and optionally the pipeline version ID. + if jobs[i].PipelineSpec.PipelineSpecManifest == "" && jobs[i].PipelineSpec.WorkflowSpecManifest == "" { + continue + } + tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec) if err != nil { return failedToReconcileSwfCrsError(err) @@ -1041,13 +1047,6 @@ func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec mode // Manifest's namespace gets overwritten with the job.Namespace if the later is non-empty. // Otherwise, job.Namespace gets overwritten by the manifest. func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model.Job, error) { - // Create a template based on the manifest of an existing pipeline version or used-provided manifest. - // Update the job.PipelineSpec if an existing pipeline version is used. - tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&job.PipelineSpec) - if err != nil { - return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest") - } - // Create a new ScheduledWorkflow at the ScheduledWorkflow client. k8sNamespace := job.Namespace if k8sNamespace == "" { @@ -1059,12 +1058,63 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model job.Namespace = k8sNamespace - // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). - // Convert modelJob into scheduledWorkflow. - scheduledWorkflow, err := tmpl.ScheduledWorkflow(job) - if err != nil { - return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") + var manifest string + var scheduledWorkflow *scheduledworkflow.ScheduledWorkflow + var tmpl template.Template + + // If the pipeline version or pipeline spec is provided, this means the user wants to pin to a specific pipeline. + // Otherwise, always let the ScheduledWorkflow controller pick the latest. + if job.PipelineVersionId != "" || job.PipelineSpecManifest != "" || job.WorkflowSpecManifest != "" { + var err error + // Create a template based on the manifest of an existing pipeline version or used-provided manifest. + // Update the job.PipelineSpec if an existing pipeline version is used. + tmpl, manifest, err = r.fetchTemplateFromPipelineSpec(&job.PipelineSpec) + if err != nil { + return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest") + } + + // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). + // Convert modelJob into scheduledWorkflow. + scheduledWorkflow, err = tmpl.ScheduledWorkflow(job) + if err != nil { + return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") + } + } else if job.PipelineId == "" { + return nil, errors.New("Cannot create a job with an empty pipeline ID") + } else { + // Validate the input parameters on the latest pipeline version. The latest pipeline version is not stored + // in the ScheduledWorkflow. It's just to help the user with up front validation at recurring run creation + // time. + manifest, err := r.GetPipelineLatestTemplate(job.PipelineId) + if err != nil { + return nil, util.Wrap(err, "Failed to validate the input parameters on the latest pipeline version") + } + + tmpl, err := template.New(manifest) + if err != nil { + return nil, util.Wrap(err, "Failed to fetch a template with an invalid pipeline spec manifest") + } + + _, err = tmpl.ScheduledWorkflow(job) + if err != nil { + return nil, util.Wrap(err, "Failed to validate the input parameters on the latest pipeline version") + } + + scheduledWorkflow, err = template.NewGenericScheduledWorkflow(job) + if err != nil { + return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") + } + + parameters, err := template.StringMapToCRDParameters(job.RuntimeConfig.Parameters) + if err != nil { + return nil, util.Wrap(err, "Converting runtime config's parameters to CDR parameters failed") + } + + scheduledWorkflow.Spec.Workflow = &scheduledworkflow.WorkflowResource{ + Parameters: parameters, PipelineRoot: job.PipelineRoot, + } } + newScheduledWorkflow, err := r.getScheduledWorkflowClient(k8sNamespace).Create(ctx, scheduledWorkflow) if err != nil { if err, ok := err.(net.Error); ok && err.Timeout() { @@ -1080,6 +1130,11 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model for _, modelRef := range job.ResourceReferences { modelRef.ResourceUUID = string(swf.UID) } + + if tmpl == nil { + return r.jobStore.CreateJob(job) + } + if tmpl.GetTemplateType() == template.V1 { // Get the service account serviceAccount := "" diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 3a29893fec5..4271e4f05fd 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -2366,37 +2366,25 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) { pipelineStore, ok := store.pipelineStore.(*storage.PipelineStore) assert.True(t, ok) pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(FakeUUIDOne, nil)) - pv := createPipelineVersion( - pipeline.UUID, - "version_for_job", - "", - "", - testWorkflow.ToStringForStore(), - "", - "", - ) - version, err := manager.CreatePipelineVersion(pv) - assert.Nil(t, err) // The pipeline specified via pipeline id will be converted to this // pipeline's default version, which will be used to create run. newJob, err := manager.CreateJob(context.Background(), job) expectedJob := &model.Job{ - UUID: "123e4567-e89b-12d3-a456-426655440000", - DisplayName: "j1", - K8SName: "job-", - Namespace: "ns1", - ServiceAccount: "pipeline-runner", + UUID: "123e4567-e89b-12d3-a456-426655440000", + DisplayName: "j1", + K8SName: "job-", + Namespace: "ns1", + // Since there is no pipeline version or service account specified, the API server will select the service + // account when compiling the run, not within the ScheduledWorkflow. + ServiceAccount: "", Enabled: true, - CreatedAtInSec: 5, - UpdatedAtInSec: 5, + CreatedAtInSec: 4, + UpdatedAtInSec: 4, Conditions: "STATUS_UNSPECIFIED", PipelineSpec: model.PipelineSpec{ - PipelineId: pipeline.UUID, - PipelineName: version.Name, - PipelineVersionId: version.UUID, - WorkflowSpecManifest: testWorkflow.ToStringForStore(), - Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]", + PipelineId: pipeline.UUID, + Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]", }, ExperimentId: experiment.UUID, } @@ -2512,6 +2500,7 @@ func TestCreateJob_ThroughPipelineIdAndPipelineVersion(t *testing.T) { } func TestCreateJob_EmptyPipelineSpec(t *testing.T) { + initEnvVars() store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) defer store.Close() manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false}) @@ -2526,7 +2515,11 @@ func TestCreateJob_EmptyPipelineSpec(t *testing.T) { } _, err := manager.CreateJob(context.Background(), job) assert.NotNil(t, err) - assert.Contains(t, err.Error(), "Failed to fetch a template with an empty pipeline spec manifest") + errMsg := "" + if err != nil { + errMsg = err.Error() + } + assert.Contains(t, errMsg, "Cannot create a job with an empty pipeline ID") } func TestCreateJob_InvalidWorkflowSpec(t *testing.T) { diff --git a/backend/src/apiserver/server/api_converter.go b/backend/src/apiserver/server/api_converter.go index bdf55811de3..e6351b1397a 100644 --- a/backend/src/apiserver/server/api_converter.go +++ b/backend/src/apiserver/server/api_converter.go @@ -1869,10 +1869,13 @@ func toModelJob(j interface{}) (*model.Job, error) { case *apiv2beta1.RecurringRun: pipelineId = apiJob.GetPipelineVersionReference().GetPipelineId() pipelineVersionId = apiJob.GetPipelineVersionReference().GetPipelineVersionId() - if spec, err := pipelineSpecStructToYamlString(apiJob.GetPipelineSpec()); err == nil { - pipelineSpec = spec - } else { - return nil, util.Wrap(err, "Failed to convert API recurring run to its internal representation due to pipeline spec conversion error") + + if apiJob.GetPipelineSpec() != nil { + if spec, err := pipelineSpecStructToYamlString(apiJob.GetPipelineSpec()); err == nil { + pipelineSpec = spec + } else { + return nil, util.Wrap(err, "Failed to convert API recurring run to its internal representation due to pipeline spec conversion error") + } } cfg, err := toModelRuntimeConfig(apiJob.GetRuntimeConfig()) @@ -1933,6 +1936,7 @@ func toModelJob(j interface{}) (*model.Job, error) { } else if pipelineVersionId != "" { pipelineName = fmt.Sprintf("pipelines/%v", pipelineVersionId) } + status := model.StatusStateUnspecified if isEnabled { status = model.StatusStateEnabled diff --git a/backend/src/apiserver/server/job_server.go b/backend/src/apiserver/server/job_server.go index d78db2aff76..557287ee62f 100644 --- a/backend/src/apiserver/server/job_server.go +++ b/backend/src/apiserver/server/job_server.go @@ -115,6 +115,19 @@ func (s *JobServer) CreateJob(ctx context.Context, request *apiv1beta1.CreateJob if err != nil { return nil, util.Wrap(err, "Failed to create a recurring run due to conversion error") } + + // In the v2 API, the pipeline version being empty means always pick the latest at run submission time. In v1, + // it means to use the latest pipeline version at recurring run creation time. Handle this case here since + // modelJob does not have the concept of which API version it came from. + if modelJob.PipelineId != "" && modelJob.WorkflowSpecManifest == "" && modelJob.PipelineSpecManifest == "" && modelJob.PipelineVersionId == "" { + pipelineVersion, err := s.resourceManager.GetLatestPipelineVersion(modelJob.PipelineId) + if err != nil { + return nil, util.Wrapf(err, "Failed to fetch a pipeline version from pipeline %v", modelJob.PipelineId) + } + + modelJob.PipelineVersionId = pipelineVersion.UUID + } + newJob, err := s.createJob(ctx, modelJob) if err != nil { return nil, util.Wrap(err, "Failed to create a recurring run") diff --git a/backend/src/apiserver/server/job_server_test.go b/backend/src/apiserver/server/job_server_test.go index f9dc4b80a77..ebe81973296 100644 --- a/backend/src/apiserver/server/job_server_test.go +++ b/backend/src/apiserver/server/job_server_test.go @@ -148,10 +148,11 @@ func TestCreateJob_WrongInput(t *testing.T) { Trigger: &apiv1beta1.Trigger_CronSchedule{CronSchedule: &apiv1beta1.CronSchedule{ StartTime: ×tamp.Timestamp{Seconds: 1}, Cron: "1 * * * *", - }}}, + }}, + }, ResourceReferences: validReference, }, - "Failed to fetch a template with an empty pipeline spec manifest", + "Failed to create a recurring run: Cannot create a job with an empty pipeline ID", }, { "invalid pipeline spec", @@ -172,7 +173,8 @@ func TestCreateJob_WrongInput(t *testing.T) { {Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_EXPERIMENT, Id: experiment.UUID}, Relationship: apiv1beta1.Relationship_OWNER}, }, }, - "Failed to get the latest pipeline version as pipeline was not found: ResourceNotFoundError: Pipeline not_exist_pipeline not found", + "Failed to fetch a pipeline version from pipeline not_exist_pipeline: Failed to get the latest " + + "pipeline version as pipeline was not found: ResourceNotFoundError: Pipeline not_exist_pipeline not found", }, { "invalid cron", @@ -240,7 +242,11 @@ func TestCreateJob_WrongInput(t *testing.T) { for _, tt := range tests { got, err := server.CreateJob(context.Background(), &apiv1beta1.CreateJobRequest{Job: tt.arg}) assert.NotNil(t, err) - assert.Contains(t, err.Error(), tt.errMsg) + errMsg := "" + if err != nil { + errMsg = err.Error() + } + assert.Contains(t, errMsg, tt.errMsg) assert.Nil(t, got) } } diff --git a/backend/src/apiserver/template/argo_template.go b/backend/src/apiserver/template/argo_template.go index 90a04f6bd5d..721d9cd8fc3 100644 --- a/backend/src/apiserver/template/argo_template.go +++ b/backend/src/apiserver/template/argo_template.go @@ -23,7 +23,6 @@ import ( "github.com/kubeflow/pipelines/backend/src/apiserver/model" "github.com/kubeflow/pipelines/backend/src/common/util" scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/yaml" ) @@ -113,10 +112,6 @@ func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu setDefaultServiceAccount(workflow, modelJob.ServiceAccount) // Disable istio sidecar injection if not specified workflow.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled) - swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName) - if err != nil { - return nil, util.Wrap(err, "Create job failed") - } // Marking auto-added artifacts as optional. Otherwise most older workflows will start failing after upgrade to Argo 2.3. // TODO: Fix the components to explicitly declare the artifacts they really output. @@ -127,28 +122,17 @@ func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu if err != nil { return nil, util.Wrap(err, "Failed to convert v1 parameters to CRD parameters") } - crdTrigger, err := modelToCRDTrigger(modelJob.Trigger) + + scheduledWorkflow, err := NewGenericScheduledWorkflow(modelJob) if err != nil { return nil, err } - scheduledWorkflow := &scheduledworkflow.ScheduledWorkflow{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "kubeflow.org/v1beta1", - Kind: "ScheduledWorkflow", - }, - ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, - Spec: scheduledworkflow.ScheduledWorkflowSpec{ - Enabled: modelJob.Enabled, - MaxConcurrency: &modelJob.MaxConcurrency, - Trigger: crdTrigger, - Workflow: &scheduledworkflow.WorkflowResource{ - Parameters: swfParameters, - Spec: workflow.ToStringForSchedule(), - }, - NoCatchup: util.BoolPointer(modelJob.NoCatchup), - }, + scheduledWorkflow.Spec.Workflow = &scheduledworkflow.WorkflowResource{ + Parameters: swfParameters, + Spec: workflow.ToStringForSchedule(), } + return scheduledWorkflow, nil } diff --git a/backend/src/apiserver/template/template.go b/backend/src/apiserver/template/template.go index 738838c1651..f793dbc646f 100644 --- a/backend/src/apiserver/template/template.go +++ b/backend/src/apiserver/template/template.go @@ -169,7 +169,7 @@ func modelToPipelineJobRuntimeConfig(modelRuntimeConfig *model.RuntimeConfig) (* // Assumes that the serialized parameters will take a form of // map[string]*structpb.Value, which works for runtimeConfig.Parameters such as // {"param1":"value1","param2":"value2"}. -func stringMapToCRDParameters(modelParams string) ([]scheduledworkflow.Parameter, error) { +func StringMapToCRDParameters(modelParams string) ([]scheduledworkflow.Parameter, error) { var swParams []scheduledworkflow.Parameter var parameters map[string]*structpb.Value if modelParams == "" { diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index 1055bcdf8a9..df2ff9e3a13 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -41,9 +41,38 @@ type V2Spec struct { platformSpec *pipelinespec.PlatformSpec } -var ( - Launcher = "" -) +var Launcher = "" + +func NewGenericScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) { + swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName) + if err != nil { + return nil, util.Wrap(err, "Create job failed") + } + + crdTrigger, err := modelToCRDTrigger(modelJob.Trigger) + if err != nil { + return nil, util.Wrap(err, "converting model trigger to crd trigger failed") + } + + return &scheduledworkflow.ScheduledWorkflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubeflow.org/v2beta1", + Kind: "ScheduledWorkflow", + }, + ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, + Spec: scheduledworkflow.ScheduledWorkflowSpec{ + Enabled: modelJob.Enabled, + MaxConcurrency: &modelJob.MaxConcurrency, + Trigger: crdTrigger, + NoCatchup: util.BoolPointer(modelJob.NoCatchup), + ExperimentId: modelJob.ExperimentId, + PipelineId: modelJob.PipelineId, + PipelineName: modelJob.PipelineName, + PipelineVersionId: modelJob.PipelineVersionId, + ServiceAccount: modelJob.ServiceAccount, + }, + }, nil +} // Converts modelJob to ScheduledWorkflow. func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) { @@ -102,41 +131,23 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche } // Disable istio sidecar injection if not specified executionSpec.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled) - swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName) - if err != nil { - return nil, util.Wrap(err, "Create job failed") - } - parameters, err := stringMapToCRDParameters(modelJob.RuntimeConfig.Parameters) + parameters, err := StringMapToCRDParameters(modelJob.RuntimeConfig.Parameters) if err != nil { return nil, util.Wrap(err, "Converting runtime config's parameters to CDR parameters failed") } - crdTrigger, err := modelToCRDTrigger(modelJob.Trigger) + + scheduledWorkflow, err := NewGenericScheduledWorkflow(modelJob) if err != nil { - return nil, util.Wrap(err, "converting model trigger to crd trigger failed") + return nil, err } - scheduledWorkflow := &scheduledworkflow.ScheduledWorkflow{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "kubeflow.org/v2beta1", - Kind: "ScheduledWorkflow", - }, - ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, - Spec: scheduledworkflow.ScheduledWorkflowSpec{ - Enabled: modelJob.Enabled, - MaxConcurrency: &modelJob.MaxConcurrency, - Trigger: crdTrigger, - Workflow: &scheduledworkflow.WorkflowResource{ - Parameters: parameters, - Spec: executionSpec.ToStringForSchedule(), - }, - NoCatchup: util.BoolPointer(modelJob.NoCatchup), - ExperimentId: modelJob.ExperimentId, - PipelineId: modelJob.PipelineId, - PipelineName: modelJob.PipelineName, - PipelineVersionId: modelJob.PipelineVersionId, - ServiceAccount: executionSpec.ServiceAccount(), - }, + scheduledWorkflow.Spec.Workflow = &scheduledworkflow.WorkflowResource{ + Parameters: parameters, + Spec: executionSpec.ToStringForSchedule(), } + + scheduledWorkflow.Spec.ServiceAccount = executionSpec.ServiceAccount() + return scheduledWorkflow, nil } diff --git a/backend/src/common/util/service.go b/backend/src/common/util/service.go index cf0f5379a38..c5624b64229 100644 --- a/backend/src/common/util/service.go +++ b/backend/src/common/util/service.go @@ -15,6 +15,7 @@ package util import ( + "context" "fmt" "net/http" "strings" @@ -94,6 +95,16 @@ func GetKubernetesClientFromClientConfig(clientConfig clientcmd.ClientConfig) ( return clientSet, config, namespace, nil } +func GetRpcConnectionWithTimeout(address string, timeout time.Time) (*grpc.ClientConn, error) { + ctx, _ := context.WithDeadline(context.Background(), timeout) + + conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create gRPC connection") + } + return conn, nil +} + func GetRpcConnection(address string) (*grpc.ClientConn, error) { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { diff --git a/backend/src/crd/controller/scheduledworkflow/controller.go b/backend/src/crd/controller/scheduledworkflow/controller.go index 51581c47e46..d11d537ac5f 100644 --- a/backend/src/crd/controller/scheduledworkflow/controller.go +++ b/backend/src/crd/controller/scheduledworkflow/controller.go @@ -20,6 +20,7 @@ import ( "time" workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + api "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client" commonutil "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/client" "github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/util" @@ -30,6 +31,8 @@ import ( swfinformers "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/informers/externalversions" wraperror "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/structpb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" @@ -40,6 +43,7 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" ) @@ -60,6 +64,7 @@ type Controller struct { kubeClient *client.KubeClient swfClient *client.ScheduledWorkflowClient workflowClient *client.WorkflowClient + runClient api.RunServiceClient // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This @@ -73,6 +78,10 @@ type Controller struct { // the timezone loation which the scheduled will use location *time.Location + + // tokenSrc provides a way to get the latest refreshed token when authentication to the REST API server is enabled. + // This will be nil when authentication is not enabled. + tokenSrc transport.ResettableTokenSource } // NewController returns a new sample controller @@ -80,10 +89,12 @@ func NewController( kubeClientSet kubernetes.Interface, swfClientSet swfclientset.Interface, workflowClientSet commonutil.ExecutionClient, + runClient api.RunServiceClient, swfInformerFactory swfinformers.SharedInformerFactory, executionInformer commonutil.ExecutionInformer, time commonutil.TimeInterface, location *time.Location, + tokenSrc transport.ResettableTokenSource, ) (*Controller, error) { // obtain references to shared informers swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows() @@ -102,11 +113,13 @@ func NewController( controller := &Controller{ kubeClient: client.NewKubeClient(kubeClientSet, recorder), swfClient: client.NewScheduledWorkflowClient(swfClientSet, swfInformer), + runClient: runClient, workflowClient: client.NewWorkflowClient(workflowClientSet, executionInformer), workqueue: workqueue.NewNamedRateLimitingQueue( workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), swfregister.Kind), time: time, location: location, + tokenSrc: tokenSrc, } log.Info("Setting up event handlers") @@ -507,12 +520,72 @@ func (c *Controller) submitNewWorkflowIfNotAlreadySubmitted( } // If the workflow is not found, we need to create it. - newWorkflow, err := swf.NewWorkflow(nextScheduledEpoch, nowEpoch) - createdWorkflow, err := c.workflowClient.Create(ctx, swf.Namespace, newWorkflow) + if swf.Spec.Workflow != nil && swf.Spec.Workflow.Spec != nil { + newWorkflow, err := swf.NewWorkflow(nextScheduledEpoch, nowEpoch) + if err != nil { + return false, "", err + } + + createdWorkflow, err := c.workflowClient.Create(ctx, swf.Namespace, newWorkflow) + if err != nil { + return false, "", err + } + return true, createdWorkflow.ExecutionName(), nil + } + + if c.tokenSrc != nil { + token, err := c.tokenSrc.Token() + if err != nil { + return false, "", fmt.Errorf("Failed to get a token to communicate with the REST API: %w", err) + } + + ctx = metadata.AppendToOutgoingContext(ctx, "Authorization", "Bearer "+token.AccessToken) + } + + var runtimeConfig *api.RuntimeConfig + + if swf.Spec.Workflow != nil { + runtimeConfig = &api.RuntimeConfig{ + Parameters: map[string]*structpb.Value{}, + PipelineRoot: swf.Spec.Workflow.PipelineRoot, + } + + for _, param := range swf.Spec.Workflow.Parameters { + val := &structpb.Value{} + + err := val.UnmarshalJSON([]byte(param.Value)) + if err != nil { + return false, "", err + } + + runtimeConfig.Parameters[param.Name] = val + } + } + + run, err := c.runClient.CreateRun(ctx, &api.CreateRunRequest{ + ExperimentId: swf.Spec.ExperimentId, + Run: &api.Run{ + ExperimentId: swf.Spec.ExperimentId, + DisplayName: swf.NextResourceName(), + RecurringRunId: string(swf.UID), + RuntimeConfig: runtimeConfig, + PipelineSource: &api.Run_PipelineVersionReference{ + PipelineVersionReference: &api.PipelineVersionReference{ + PipelineId: swf.Spec.PipelineId, + // This can be empty, which causes the latest pipeline version to be selected. + PipelineVersionId: swf.Spec.PipelineVersionId, + }, + }, + ServiceAccount: swf.Spec.ServiceAccount, + }, + }) if err != nil { - return false, "", err + return false, "", fmt.Errorf( + "failed to create a run from the scheduled workflow (%s/%s): %w", swf.Namespace, swf.Name, err, + ) } - return true, createdWorkflow.ExecutionName(), nil + + return true, run.DisplayName, nil } func (c *Controller) updateStatus( diff --git a/backend/src/crd/controller/scheduledworkflow/main.go b/backend/src/crd/controller/scheduledworkflow/main.go index e4d176a2a1f..4cb178e0038 100644 --- a/backend/src/crd/controller/scheduledworkflow/main.go +++ b/backend/src/crd/controller/scheduledworkflow/main.go @@ -16,9 +16,12 @@ package main import ( "flag" + "fmt" + "os" "strings" "time" + api "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client" commonutil "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/util" swfclientset "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned" @@ -29,16 +32,27 @@ import ( "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/transport" ) var ( - logLevel string - masterURL string - kubeconfig string - namespace string - location *time.Location - clientQPS float64 - clientBurst int + logLevel string + masterURL string + kubeconfig string + namespace string + location *time.Location + clientQPS float64 + clientBurst int + mlPipelineAPIServerName string + mlPipelineServiceGRPCPort string +) + +const ( + // These flags match the persistence agent + mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath" + mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName" + mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort" + apiTokenFile = "/var/run/secrets/kubeflow/tokens/scheduledworkflow-sa-token" ) func main() { @@ -85,14 +99,35 @@ func main() { scheduleInformerFactory = swfinformers.NewFilteredSharedInformerFactory(scheduleClient, time.Second*30, namespace, nil) } + grpcAddress := fmt.Sprintf("%s:%s", mlPipelineAPIServerName, mlPipelineServiceGRPCPort) + + log.Infof("Connecting the API server over GRPC at: %s", grpcAddress) + apiConnection, err := commonutil.GetRpcConnectionWithTimeout(grpcAddress, time.Now().Add(time.Minute)) + if err != nil { + log.Fatalf("Error connecting to the API server after trying for one minute: %v", err) + } + + var tokenSrc transport.ResettableTokenSource + + if _, err := os.Stat(apiTokenFile); err == nil { + tokenSrc = transport.NewCachedFileTokenSource(apiTokenFile) + } + + runClient := api.NewRunServiceClient(apiConnection) + + log.Info("Successfully connected to the API server") + controller, err := NewController( kubeClient, scheduleClient, execClient, + runClient, scheduleInformerFactory, execInformer, commonutil.NewRealTime(), - location) + location, + tokenSrc, + ) if err != nil { log.Fatalf("Failed to instantiate the controller: %v", err) } @@ -123,6 +158,8 @@ func init() { // Use default value of client QPS (5) & burst (10) defined in // k8s.io/client-go/rest/config.go#RESTClientFor flag.Float64Var(&clientQPS, "clientQPS", 5, "The maximum QPS to the master from this client.") + flag.StringVar(&mlPipelineAPIServerName, mlPipelineAPIServerNameFlagName, "ml-pipeline", "Name of the ML pipeline API server.") + flag.StringVar(&mlPipelineServiceGRPCPort, mlPipelineAPIServerGRPCPortFlagName, "8887", "GRPC Port of the ML pipeline API server.") flag.IntVar(&clientBurst, "clientBurst", 10, "Maximum burst for throttle from this client.") var err error location, err = util.GetLocation() diff --git a/backend/src/crd/controller/scheduledworkflow/util/scheduled_workflow.go b/backend/src/crd/controller/scheduledworkflow/util/scheduled_workflow.go index 3e1aac51211..ea4bfe2b8f1 100644 --- a/backend/src/crd/controller/scheduledworkflow/util/scheduled_workflow.go +++ b/backend/src/crd/controller/scheduledworkflow/util/scheduled_workflow.go @@ -17,12 +17,13 @@ package util import ( "fmt" "hash/fnv" - corev1 "k8s.io/api/core/v1" "math" "sort" "strconv" "time" + corev1 "k8s.io/api/core/v1" + commonutil "github.com/kubeflow/pipelines/backend/src/common/util" swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go b/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go index 66cc7692707..3de0d6dae15 100644 --- a/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go +++ b/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go @@ -108,10 +108,14 @@ type WorkflowResource struct { Parameters []Parameter `json:"parameters,omitempty"` + PipelineRoot string `json:"pipelineRoot,omitempty"` + // Specification of the workflow to start. // Use interface{} for backward compatibility // TODO: change it to string and avoid type casting // after several releases + // This is deprecated. In a future release, this will be ignored and this will be compiled by the API server + // at runtime. Spec interface{} `json:"spec,omitempty"` } diff --git a/backend/test/v2/integration/recurring_run_api_test.go b/backend/test/v2/integration/recurring_run_api_test.go index cfb0a5a245d..0cf92365515 100644 --- a/backend/test/v2/integration/recurring_run_api_test.go +++ b/backend/test/v2/integration/recurring_run_api_test.go @@ -373,6 +373,73 @@ func (s *RecurringRunApiTestSuite) TestRecurringRunApis() { } } +func (s *RecurringRunApiTestSuite) TestRecurringRunApisUseLatest() { + t := s.T() + + /* ---------- Upload pipelines YAML ---------- */ + helloWorldPipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", upload_params.NewUploadPipelineParams()) + assert.Nil(t, err) + + /* ---------- Upload pipeline version YAML ---------- */ + time.Sleep(1 * time.Second) + helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion( + "../resources/hello-world.yaml", &upload_params.UploadPipelineVersionParams{ + Name: util.StringPointer("hello-world-version"), + Pipelineid: util.StringPointer(helloWorldPipeline.PipelineID), + }) + assert.Nil(t, err) + + /* ---------- Create a new hello world experiment ---------- */ + experiment := test.MakeExperiment("hello world experiment", "", s.resourceNamespace) + helloWorldExperiment, err := s.experimentClient.Create(&experiment_params.ExperimentServiceCreateExperimentParams{Body: experiment}) + assert.Nil(t, err) + + /* ---------- Create a new hello world recurringRun by specifying pipeline ID without a version ---------- */ + createRecurringRunRequest := &recurring_run_params.RecurringRunServiceCreateRecurringRunParams{Body: &recurring_run_model.V2beta1RecurringRun{ + DisplayName: "hello world with latest pipeline version", + Description: "this is hello world", + ExperimentID: helloWorldExperiment.ExperimentID, + PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{ + PipelineID: helloWorldPipelineVersion.PipelineID, + }, + MaxConcurrency: 10, + Mode: recurring_run_model.RecurringRunModeENABLE, + }} + helloWorldRecurringRun, err := s.recurringRunClient.Create(createRecurringRunRequest) + assert.Nil(t, err) + + // The scheduledWorkflow CRD would create the run and it synced to the DB by persistent agent. + // This could take a few seconds to finish. + + /* ---------- Check run for hello world recurringRun ---------- */ + var helloWorldRun *run_model.V2beta1Run + + if err := retrier.New(retrier.ConstantBackoff(8, 5*time.Second), nil).Run(func() error { + runs, totalSize, _, err := s.runClient.List(&run_params.RunServiceListRunsParams{ + ExperimentID: util.StringPointer(helloWorldExperiment.ExperimentID), + }) + if err != nil { + return err + } + if len(runs) != 1 { + return fmt.Errorf("expected runs to be length 1, got: %v", len(runs)) + } + if totalSize != 1 { + return fmt.Errorf("expected total size 1, got: %v", totalSize) + } + helloWorldRun = runs[0] + return s.checkHelloWorldRun(helloWorldRun, helloWorldExperiment.ExperimentID, helloWorldRecurringRun.RecurringRunID) + }); err != nil { + assert.Nil(t, err) + assert.FailNow(t, "Timed out waiting for the recurring run") + } + + // Verify the latest pipeline version was selected + assert.Equal( + t, helloWorldPipelineVersion.PipelineVersionID, helloWorldRun.PipelineVersionReference.PipelineVersionID, + ) +} + func (s *RecurringRunApiTestSuite) TestRecurringRunApis_noCatchupOption() { t := s.T() diff --git a/manifests/kustomize/base/installs/multi-user/scheduled-workflow/cluster-role.yaml b/manifests/kustomize/base/installs/multi-user/scheduled-workflow/cluster-role.yaml index fd868eaad07..45a10bb45ec 100644 --- a/manifests/kustomize/base/installs/multi-user/scheduled-workflow/cluster-role.yaml +++ b/manifests/kustomize/base/installs/multi-user/scheduled-workflow/cluster-role.yaml @@ -15,6 +15,12 @@ rules: - update - patch - delete +- apiGroups: + - pipelines.kubeflow.org + resources: + - runs + verbs: + - create - apiGroups: - kubeflow.org resources: diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-deployment.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-deployment.yaml index 045a0882302..6bec64a139a 100644 --- a/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-deployment.yaml +++ b/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-deployment.yaml @@ -41,4 +41,15 @@ spec: capabilities: drop: - ALL + volumeMounts: + - mountPath: /var/run/secrets/kubeflow/tokens + name: scheduledworkflow-sa-token serviceAccountName: ml-pipeline-scheduledworkflow + volumes: + - name: scheduledworkflow-sa-token + projected: + sources: + - serviceAccountToken: + path: scheduledworkflow-sa-token + expirationSeconds: 3600 + audience: pipelines.kubeflow.org diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-role.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-role.yaml index 36729d74ed3..c6f4918f1d8 100644 --- a/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-role.yaml +++ b/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-role.yaml @@ -30,6 +30,12 @@ rules: - update - patch - delete +- apiGroups: + - pipelines.kubeflow.org + resources: + - runs + verbs: + - create - apiGroups: - '' resources: