Skip to content

Commit

Permalink
Add support for importing models stored in the Modelcar format
Browse files Browse the repository at this point in the history
This allows dsl.import to leverage Modelcar container images in an OCI
repository. This works by having an init container prepull the image and
then adding a sidecar container when the launcher container is running.
The Modelcar container adds a symlink to its /models directory in an
emptyDir volume that is accessible by the launcher container. Once the
launcher is done running the user code, it stops the Modelcar container
images.

This approach has the benefit of leveraging image pull secrets
configured on the Kubernetes cluster rather than require separate
credentials for importing the artifact. Additionally, no data is copied
to the emptyDir volume, so the storage cost is just pulling the Modelcar
container image on the Kubernetes worker node.

Note that once Kubernetes supports OCI images as volume mounts for
several releases, consider replacing the init container with that
approach.

Resolves:
#11584

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl committed Feb 8, 2025
1 parent 6cb7cf7 commit fdc3233
Show file tree
Hide file tree
Showing 11 changed files with 424 additions and 18 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/kfp-samples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ jobs:
with:
k8s_version: ${{ matrix.k8s_version }}

- name: Build and upload the sample Modelcar image to Kind
run: |
docker build -f samples/v2/modelcar_import/Dockerfile -t registry.domain.local/modelcar:test .
kind --name kfp load docker-image registry.domain.local/modelcar:test
- name: Forward API port
run: ./.github/resources/scripts/forward-port.sh "kubeflow" "ml-pipeline" 8888 8888

- name: Run Samples Tests
env:
PULL_NUMBER: ${{ github.event.pull_request.number }}
run: |
./backend/src/v2/test/sample-test.sh
24 changes: 20 additions & 4 deletions backend/src/v2/component/importer_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/kubeflow/pipelines/backend/src/v2/objectstore"

pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata"
Expand Down Expand Up @@ -227,10 +229,6 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact

state := pb.Artifact_LIVE

provider, err := objectstore.ParseProviderFromPath(artifactUri)
if err != nil {
return nil, fmt.Errorf("No Provider scheme found in artifact Uri: %s", artifactUri)
}
artifact = &pb.Artifact{
TypeId: &artifactTypeId,
State: &state,
Expand All @@ -248,6 +246,24 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
}
}

if strings.HasPrefix(artifactUri, "oci://") {
artifactType, err := metadata.SchemaToArtifactType(schema)
if err != nil {
return nil, fmt.Errorf("converting schema to artifact type failed: %w", err)
}

if *artifactType.Name != "system.Model" {
return nil, fmt.Errorf("the %s artifact type does not support OCI registries", *artifactType.Name)
}

return artifact, nil
}

provider, err := objectstore.ParseProviderFromPath(artifactUri)
if err != nil {
return nil, fmt.Errorf("no provider scheme found in artifact URI: %s", artifactUri)
}

// Assume all imported artifacts will rely on execution environment for store provider session info
storeSessionInfo := objectstore.SessionInfo{
Provider: provider,
Expand Down
132 changes: 118 additions & 14 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
"time"

"github.com/golang/glog"
Expand All @@ -43,6 +45,12 @@ import (
"k8s.io/client-go/rest"
)

var findPIDRegex *regexp.Regexp

func init() {
findPIDRegex = regexp.MustCompile(`\/proc\/(\d+)\/.+`)
}

type LauncherV2Options struct {
Namespace,
PodName,
Expand Down Expand Up @@ -398,9 +406,15 @@ func execute(
namespace string,
k8sClient kubernetes.Interface,
) (*pipelinespec.ExecutorOutput, error) {
if err := downloadArtifacts(ctx, executorInput, bucket, bucketConfig, namespace, k8sClient); err != nil {
cleanUpFuncs, err := downloadArtifacts(ctx, executorInput, bucket, bucketConfig, namespace, k8sClient)
for _, cleanUpFunc := range cleanUpFuncs {
defer cleanUpFunc()
}

if err != nil {
return nil, err
}

if err := prepareOutputFolders(executorInput); err != nil {
return nil, err
}
Expand Down Expand Up @@ -441,7 +455,7 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
}

// Upload artifacts from local path to remote storages.
localDir, err := localPathForURI(outputArtifact.Uri)
localDir, err := LocalPathForURI(outputArtifact.Uri)
if err != nil {
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
} else {
Expand Down Expand Up @@ -477,7 +491,72 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
return outputArtifacts, nil
}

func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.ExecutorInput, defaultBucket *blob.Bucket, defaultBucketConfig *objectstore.Config, namespace string, k8sClient kubernetes.Interface) error {
// waitForModelcar assumes the Modelcar has already been validated by the init container on the launcher
// pod. This waits for the Modelcar as a sidecar container to be ready. It returns a function that will stop
// the Modelcar container that is to be run after the user code has completed.
func waitForModelcar(artifactURI string, localPath string) func() {
glog.Infof("Waiting for the Modelcar %s to be available", artifactURI)

for {
_, err := os.Stat(localPath)
if err != nil {
time.Sleep(500 * time.Millisecond)

continue
}

targetPath, err := os.Readlink(localPath)
if err != nil {
glog.Infof(
"Expected the Modelcar local path to be a symlink, will not stop Modelcar %s", artifactURI,
)

return nil
}

matches := findPIDRegex.FindStringSubmatch(targetPath)
if len(matches) != 2 {
glog.Infof(
"Expected the Modelcar symlink (%s) to start with /proc/$pid, will not stop Modelcar %s",
targetPath, artifactURI,
)

return nil
}

pidStr := matches[1]

pid, err := strconv.Atoi(pidStr)
if err != nil {
glog.Infof(
"Expected the Modelcar symlink (%s) target to start with /proc/$pid, will not stop Modelcar %s",
targetPath, artifactURI,
)

return nil
}

return func() {
glog.Infof("Stopping the Modelcar: %s", artifactURI)

process, err := os.FindProcess(pid)
if err != nil {
// If the process stopped already, nothing to do
return
}

err = process.Signal(syscall.SIGHUP)
if err != nil {
glog.Error("Error stopping the Modelcar %s due to: %v", artifactURI, err)

return
}
}
}
}

// downloadArtifacts returns a slice of functions to call for artifact clean up after user code has completed.
func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.ExecutorInput, defaultBucket *blob.Bucket, defaultBucketConfig *objectstore.Config, namespace string, k8sClient kubernetes.Interface) ([]func(), error) {
// Read input artifact metadata.
nonDefaultBuckets, err := fetchNonDefaultBuckets(ctx, executorInput.GetInputs().GetArtifacts(), defaultBucketConfig, namespace, k8sClient)
closeNonDefaultBuckets := func(buckets map[string]*blob.Bucket) {
Expand All @@ -489,19 +568,35 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
}
defer closeNonDefaultBuckets(nonDefaultBuckets)
if err != nil {
return fmt.Errorf("failed to fetch non default buckets: %w", err)
return nil, fmt.Errorf("failed to fetch non default buckets: %w", err)
}

cleanUpFuncs := []func(){}

for name, artifactList := range executorInput.GetInputs().GetArtifacts() {
// TODO(neuromage): Support concat-based placholders for arguments.
if len(artifactList.Artifacts) == 0 {
continue
}
inputArtifact := artifactList.Artifacts[0]
localPath, err := localPathForURI(inputArtifact.Uri)

localPath, err := LocalPathForURI(inputArtifact.Uri)
if err != nil {
glog.Warningf("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path.", name, inputArtifact.Uri)

continue
}

// OCI artifacts are handled specially
if strings.HasPrefix(inputArtifact.Uri, "oci://") {
cleanUpFunc := waitForModelcar(inputArtifact.Uri, localPath)
if cleanUpFunc != nil {
cleanUpFuncs = append(cleanUpFuncs, cleanUpFunc)
}

continue
}

// Copy artifact to local storage.
copyErr := func(err error) error {
return fmt.Errorf("failed to download input artifact %q from remote storage URI %q: %w", name, inputArtifact.Uri, err)
Expand All @@ -513,25 +608,25 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
if !strings.HasPrefix(inputArtifact.Uri, defaultBucketConfig.PrefixedBucket()) {
nonDefaultBucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(inputArtifact.Uri)
if err != nil {
return fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, inputArtifact.GetUri(), err)
return cleanUpFuncs, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, inputArtifact.GetUri(), err)
}
nonDefaultBucket, ok := nonDefaultBuckets[nonDefaultBucketConfig.PrefixedBucket()]
if !ok {
return fmt.Errorf("failed to get bucket when downloading input artifact %s with bucket key %s: %w", name, nonDefaultBucketConfig.PrefixedBucket(), err)
return cleanUpFuncs, fmt.Errorf("failed to get bucket when downloading input artifact %s with bucket key %s: %w", name, nonDefaultBucketConfig.PrefixedBucket(), err)
}
bucket = nonDefaultBucket
bucketConfig = nonDefaultBucketConfig
}
blobKey, err := bucketConfig.KeyFromURI(inputArtifact.Uri)
if err != nil {
return copyErr(err)
return cleanUpFuncs, copyErr(err)
}
if err := objectstore.DownloadBlob(ctx, bucket, localPath, blobKey); err != nil {
return copyErr(err)
return cleanUpFuncs, copyErr(err)
}

}
return nil
return cleanUpFuncs, nil
}

func fetchNonDefaultBuckets(
Expand All @@ -548,6 +643,12 @@ func fetchNonDefaultBuckets(
}
// TODO: Support multiple artifacts someday, probably through the v2 engine.
artifact := artifactList.Artifacts[0]

// OCI artifacts are handled specially
if strings.HasPrefix(artifact.Uri, "oci://") {
continue
}

// The artifact does not belong under the object store path for this run. Cases:
// 1. Artifact is cached from a different run, so it may still be in the default bucket, but under a different run id subpath
// 2. Artifact is imported from the same bucket, but from a different path (re-use the same session)
Expand Down Expand Up @@ -598,7 +699,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name)
placeholders[key] = inputArtifact.Uri

localPath, err := localPathForURI(inputArtifact.Uri)
localPath, err := LocalPathForURI(inputArtifact.Uri)
if err != nil {
// Input Artifact does not have a recognized storage URI
continue
Expand All @@ -617,7 +718,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
outputArtifact := artifactList.Artifacts[0]
placeholders[fmt.Sprintf(`{{$.outputs.artifacts['%s'].uri}}`, name)] = outputArtifact.Uri

localPath, err := localPathForURI(outputArtifact.Uri)
localPath, err := LocalPathForURI(outputArtifact.Uri)
if err != nil {
return nil, fmt.Errorf("resolve output artifact %q's local path: %w", name, err)
}
Expand Down Expand Up @@ -720,7 +821,7 @@ func getExecutorOutputFile(path string) (*pipelinespec.ExecutorOutput, error) {
return executorOutput, nil
}

func localPathForURI(uri string) (string, error) {
func LocalPathForURI(uri string) (string, error) {
if strings.HasPrefix(uri, "gs://") {
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
}
Expand All @@ -730,6 +831,9 @@ func localPathForURI(uri string) (string, error) {
if strings.HasPrefix(uri, "s3://") {
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
}
if strings.HasPrefix(uri, "oci://") {
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "\\/") + "/models", nil
}
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
}

Expand All @@ -747,7 +851,7 @@ func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
}
outputArtifact := artifactList.Artifacts[0]

localPath, err := localPathForURI(outputArtifact.Uri)
localPath, err := LocalPathForURI(outputArtifact.Uri)
if err != nil {
return fmt.Errorf("failed to generate local storage path for output artifact %q: %w", name, err)
}
Expand Down
Loading

0 comments on commit fdc3233

Please sign in to comment.