diff --git a/backend/src/apiserver/client_manager/client_manager.go b/backend/src/apiserver/client_manager/client_manager.go index 7e07ca9d460..430d9698310 100644 --- a/backend/src/apiserver/client_manager/client_manager.go +++ b/backend/src/apiserver/client_manager/client_manager.go @@ -17,6 +17,7 @@ package clientmanager import ( "context" "database/sql" + "errors" "fmt" "strings" "sync" @@ -26,6 +27,8 @@ import ( awsv2cfg "github.com/aws/aws-sdk-go-v2/config" awsv2creds "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go" "github.com/cenkalti/backoff" mysqlStd "github.com/go-sql-driver/mysql" "github.com/golang/glog" @@ -38,8 +41,6 @@ import ( "github.com/kubeflow/pipelines/backend/src/apiserver/validation" "github.com/kubeflow/pipelines/backend/src/common/util" k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" "gorm.io/driver/mysql" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -952,15 +953,15 @@ func initBlobObjectStore(ctx context.Context, initConnectionTimeout time.Duratio return nil, fmt.Errorf("failed to build config from environment variables: %w", err) } + if err := ensureBucketExists(ctx, blobConfig); err != nil { + glog.Warningf("Failed to ensure bucket exists (may already exist): %v", err) + } + bucket, err := openBucketWithRetry(ctx, blobConfig, initConnectionTimeout) if err != nil { return nil, fmt.Errorf("failed to open blob storage bucket: %w", err) } - if err := ensureBucketExists(ctx, blobConfig); err != nil { - glog.Warningf("Failed to ensure bucket exists (may already exist): %v", err) - } - glog.Infof("Successfully initialized blob storage for bucket: %s", blobConfig.bucketName) return storage.NewBlobObjectStore(bucket, pipelinePath), nil } @@ -975,6 +976,11 @@ type blobStorageConfig struct { secretKey string } +type s3BucketAPI interface { + HeadBucket(ctx context.Context, params *s3.HeadBucketInput, optFns ...func(*s3.Options)) (*s3.HeadBucketOutput, error) + CreateBucket(ctx context.Context, params *s3.CreateBucketInput, optFns ...func(*s3.Options)) (*s3.CreateBucketOutput, error) +} + // ensureProtocol adds http:// or https:// protocol if not present func ensureProtocol(endpoint string, secure bool) string { if strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://") { @@ -1021,46 +1027,68 @@ func buildConfigFromEnvVars() (*blobStorageConfig, error) { }, nil } -func validateRequiredConfig(bucketName string, host string, accessKey string, secretKey string) error { +func newS3BucketClient(ctx context.Context, config *blobStorageConfig) (*s3.Client, error) { + awsCfg, err := loadAWSConfig(ctx, config) + if err != nil { + return nil, fmt.Errorf("failed to create AWS config: %w", err) + } + + endpointWithProtocol := ensureProtocol(config.endpoint, config.secure) + s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { + o.BaseEndpoint = awsv2.String(endpointWithProtocol) + o.UsePathStyle = true + }) + + return s3Client, nil +} + +func loadAWSConfig(ctx context.Context, config *blobStorageConfig) (awsv2.Config, error) { + opts := []func(*awsv2cfg.LoadOptions) error{ + awsv2cfg.WithRegion(config.region), + } + if config.accessKey != "" && config.secretKey != "" { + opts = append(opts, awsv2cfg.WithCredentialsProvider( + awsv2creds.NewStaticCredentialsProvider(config.accessKey, config.secretKey, ""), + )) + } + cfg, err := awsv2cfg.LoadDefaultConfig(ctx, opts...) + if err != nil { + return awsv2.Config{}, err + } + return cfg, nil +} + +// validateRequiredConfig validates the required object store configuration fields. +// bucketName and host are always required. Credentials (accessKey/secretKey) are optional +// to support AWS IRSA (IAM Roles for Service Accounts), environment variables, +// and instance profile-based authentication through the default AWS credential chain. +// However, if credentials are provided, both accessKey and secretKey must be set. +func validateRequiredConfig(bucketName, host, accessKey, secretKey string) error { if bucketName == "" { return fmt.Errorf("ObjectStoreConfig.BucketName is required") } if host == "" { return fmt.Errorf("ObjectStoreConfig.Host is required") } - if accessKey == "" { - return fmt.Errorf("ObjectStoreConfig.AccessKey is required") - } - if secretKey == "" { - return fmt.Errorf("ObjectStoreConfig.SecretAccessKey is required") + if (accessKey != "") != (secretKey != "") { + return fmt.Errorf("ObjectStoreConfig.AccessKey and ObjectStoreConfig.SecretAccessKey must both be set or both be empty") } return nil } -// openBucketWithRetry opens a blob bucket using AWS SDK v2 with explicit credentials and retry logic +// openBucketWithRetry opens a blob bucket using AWS SDK v2 with retry logic. +// When accessKey and secretKey are empty, it uses the default credential chain +// (supports IRSA, environment variables, instance profiles, etc.) func openBucketWithRetry(ctx context.Context, config *blobStorageConfig, timeout time.Duration) (*blob.Bucket, error) { var bucket *blob.Bucket var err error operation := func() error { - cfg, err := awsv2cfg.LoadDefaultConfig(ctx, - awsv2cfg.WithRegion(config.region), - awsv2cfg.WithCredentialsProvider(awsv2creds.NewStaticCredentialsProvider( - config.accessKey, - config.secretKey, - "", - )), - ) + s3Client, err := newS3BucketClient(ctx, config) if err != nil { - return fmt.Errorf("failed to create AWS config: %w", err) + return err } - endpointWithProtocol := ensureProtocol(config.endpoint, config.secure) - s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) { - o.BaseEndpoint = awsv2.String(endpointWithProtocol) - o.UsePathStyle = true - }) - bucket, err = s3blob.OpenBucketV2(ctx, s3Client, config.bucketName, nil) return err } @@ -1076,38 +1104,86 @@ func openBucketWithRetry(ctx context.Context, config *blobStorageConfig, timeout return bucket, nil } -func createMinioBucket(ctx context.Context, minioClient *minio.Client, bucketName, region string) { - // Check to see if it exists, and we have permission to access it. - exists, err := minioClient.BucketExists(ctx, bucketName) - if err != nil { - glog.Fatalf("Failed to check if object store bucket exists. Error: %v", err) - } - if exists { - glog.Infof("We already own %s\n", bucketName) - return - } - // Create bucket if it does not exist - err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: region}) +// ensureBucketExists creates the bucket if it doesn't exist. +// It relies on the AWS SDK default credential chain (plus optional static creds) so IRSA/web-identity +// tokens, environment variables, and instance profiles are all supported. +func ensureBucketExists(ctx context.Context, config *blobStorageConfig) error { + s3Client, err := newS3BucketClient(ctx, config) if err != nil { - glog.Fatalf("Failed to create object store bucket. Error: %v", err) + return fmt.Errorf("failed to create S3 client: %w", err) } - glog.Infof("Successfully created bucket %s\n", bucketName) + + return ensureBucketExistsWithClient(ctx, s3Client, config) } -// ensureBucketExists creates a bucket if it doesn't exist -func ensureBucketExists(ctx context.Context, config *blobStorageConfig) error { - minioClient, err := minio.New(config.endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(config.accessKey, config.secretKey, ""), - Secure: config.secure, +func ensureBucketExistsWithClient(ctx context.Context, client s3BucketAPI, config *blobStorageConfig) error { + _, err := client.HeadBucket(ctx, &s3.HeadBucketInput{ + Bucket: awsv2.String(config.bucketName), }) + if err == nil { + glog.Infof("Bucket %s already exists and is accessible", config.bucketName) + return nil + } + + if !isBucketNotFoundError(err) { + return fmt.Errorf("failed to check if bucket %s exists: %w", config.bucketName, err) + } + + glog.Infof("Bucket %s not found, attempting to create", config.bucketName) + _, err = client.CreateBucket(ctx, buildCreateBucketInput(config)) if err != nil { - return fmt.Errorf("failed to create MinIO client: %w", err) + if isBucketAlreadyOwnedByUs(err) { + glog.Infof("Bucket %s was created concurrently by us", config.bucketName) + return nil + } + return fmt.Errorf("failed to create object store bucket: %w", err) } - createMinioBucket(ctx, minioClient, config.bucketName, config.region) + glog.Infof("Successfully created bucket %s", config.bucketName) return nil } +func buildCreateBucketInput(config *blobStorageConfig) *s3.CreateBucketInput { + input := &s3.CreateBucketInput{ + Bucket: awsv2.String(config.bucketName), + } + if config.region != "" && config.region != "us-east-1" { + input.CreateBucketConfiguration = &s3types.CreateBucketConfiguration{ + LocationConstraint: s3types.BucketLocationConstraint(config.region), + } + } + + return input +} + +func isBucketNotFoundError(err error) bool { + var notFound *s3types.NotFound + if errors.As(err, ¬Found) { + return true + } + var noSuchBucket *s3types.NoSuchBucket + if errors.As(err, &noSuchBucket) { + return true + } + var apiErr smithy.APIError + if errors.As(err, &apiErr) { + code := apiErr.ErrorCode() + return code == "NotFound" || code == "NoSuchBucket" || code == "404" + } + return false +} + +// isBucketAlreadyOwnedByUs returns true only for BucketAlreadyOwnedByYou errors, +// which indicates a race condition where we created the bucket concurrently. +// BucketAlreadyExists (bucket owned by another account on AWS) is NOT suppressed +// so it surfaces in logs as a warning, helping detect misconfiguration. +// Note: SeaweedFS/MinIO return BucketAlreadyExists even for buckets you own, +// but since ensureBucketExists failures are just warnings, this is acceptable. +func isBucketAlreadyOwnedByUs(err error) bool { + var owned *s3types.BucketAlreadyOwnedByYou + return errors.As(err, &owned) +} + func initLogArchive() (logArchive archive.LogArchiveInterface) { logFileName := common.GetStringConfigWithDefault(archiveLogFileName, "") logPathPrefix := common.GetStringConfigWithDefault(archiveLogPathPrefix, "") diff --git a/backend/src/apiserver/client_manager/client_manager_test.go b/backend/src/apiserver/client_manager/client_manager_test.go index c68b600f52b..e743d50bb13 100644 --- a/backend/src/apiserver/client_manager/client_manager_test.go +++ b/backend/src/apiserver/client_manager/client_manager_test.go @@ -15,10 +15,16 @@ package clientmanager import ( + "context" + "errors" "fmt" + "net/http" + "net/http/httptest" "strings" "testing" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gorm.io/driver/sqlite" @@ -106,3 +112,319 @@ func TestFieldMeta_TaskRunId(t *testing.T) { assert.Equal(t, "tasks", table) assert.Equal(t, "RunUUID", dbCol) } + +func TestValidateRequiredConfig(t *testing.T) { + tests := []struct { + name string + bucketName string + host string + accessKey string + secretKey string + wantErr bool + errContains string + }{ + { + name: "all fields provided", + bucketName: "my-bucket", + host: "minio-service", + accessKey: "access", + secretKey: "secret", + wantErr: false, + }, + { + name: "empty bucket name", + bucketName: "", + host: "minio-service", + accessKey: "access", + secretKey: "secret", + wantErr: true, + errContains: "BucketName is required", + }, + { + name: "empty host", + bucketName: "my-bucket", + host: "", + accessKey: "access", + secretKey: "secret", + wantErr: true, + errContains: "Host is required", + }, + { + name: "empty credentials for IRSA - should pass", + bucketName: "my-bucket", + host: "s3.amazonaws.com", + accessKey: "", + secretKey: "", + wantErr: false, + }, + { + name: "only accessKey provided - should fail", + bucketName: "my-bucket", + host: "s3.amazonaws.com", + accessKey: "access", + secretKey: "", + wantErr: true, + errContains: "must both be set or both be empty", + }, + { + name: "only secretKey provided - should fail", + bucketName: "my-bucket", + host: "s3.amazonaws.com", + accessKey: "", + secretKey: "secret", + wantErr: true, + errContains: "must both be set or both be empty", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateRequiredConfig(tt.bucketName, tt.host, tt.accessKey, tt.secretKey) + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errContains) + } else { + require.NoError(t, err) + } + }) + } +} + +type fakeS3Client struct { + headErr error + createErr error + headCalls int + createCalls int + lastCreateInput *s3.CreateBucketInput +} + +func (f *fakeS3Client) HeadBucket(context.Context, *s3.HeadBucketInput, ...func(*s3.Options)) (*s3.HeadBucketOutput, error) { + f.headCalls++ + if f.headErr != nil { + return nil, f.headErr + } + return &s3.HeadBucketOutput{}, nil +} + +func (f *fakeS3Client) CreateBucket(_ context.Context, params *s3.CreateBucketInput, _ ...func(*s3.Options)) (*s3.CreateBucketOutput, error) { + f.createCalls++ + f.lastCreateInput = params + if f.createErr != nil { + return nil, f.createErr + } + return &s3.CreateBucketOutput{}, nil +} + +func TestEnsureBucketExistsWithClient_BucketAlreadyExists(t *testing.T) { + client := &fakeS3Client{} + cfg := &blobStorageConfig{bucketName: "existing", region: "us-east-1"} + + err := ensureBucketExistsWithClient(context.Background(), client, cfg) + require.NoError(t, err) + assert.Equal(t, 1, client.headCalls) + assert.Equal(t, 0, client.createCalls) +} + +func TestEnsureBucketExistsWithClient_CreatesBucket(t *testing.T) { + client := &fakeS3Client{headErr: &s3types.NotFound{}} + cfg := &blobStorageConfig{bucketName: "missing", region: "us-west-2"} + + err := ensureBucketExistsWithClient(context.Background(), client, cfg) + require.NoError(t, err) + assert.Equal(t, 1, client.headCalls) + assert.Equal(t, 1, client.createCalls) + require.NotNil(t, client.lastCreateInput.CreateBucketConfiguration) + assert.Equal(t, s3types.BucketLocationConstraint("us-west-2"), client.lastCreateInput.CreateBucketConfiguration.LocationConstraint) +} + +func TestEnsureBucketExistsWithClient_CreateBucketAlreadyOwned(t *testing.T) { + client := &fakeS3Client{ + headErr: &s3types.NotFound{}, + createErr: &s3types.BucketAlreadyOwnedByYou{}, + } + cfg := &blobStorageConfig{bucketName: "missing", region: "us-east-1"} + + err := ensureBucketExistsWithClient(context.Background(), client, cfg) + require.NoError(t, err) + assert.Equal(t, 1, client.headCalls) + assert.Equal(t, 1, client.createCalls) +} + +func TestEnsureBucketExistsWithClient_CreateBucketAlreadyExistsForeignOwner(t *testing.T) { + client := &fakeS3Client{ + headErr: &s3types.NotFound{}, + createErr: &s3types.BucketAlreadyExists{}, + } + cfg := &blobStorageConfig{bucketName: "foreign-bucket", region: "us-east-1"} + + err := ensureBucketExistsWithClient(context.Background(), client, cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to create object store bucket") + assert.Equal(t, 1, client.headCalls) + assert.Equal(t, 1, client.createCalls) +} + +func TestEnsureBucketExistsWithClient_CreateBucketError(t *testing.T) { + client := &fakeS3Client{ + headErr: &s3types.NotFound{}, + createErr: errors.New("access denied"), + } + cfg := &blobStorageConfig{bucketName: "missing", region: "us-east-1"} + + err := ensureBucketExistsWithClient(context.Background(), client, cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to create object store bucket") + assert.Equal(t, 1, client.headCalls) + assert.Equal(t, 1, client.createCalls) +} + +func TestEnsureBucketExistsWithClient_HeadBucketPermissionDenied(t *testing.T) { + client := &fakeS3Client{ + headErr: errors.New("access denied"), + } + cfg := &blobStorageConfig{bucketName: "forbidden", region: "us-east-1"} + + err := ensureBucketExistsWithClient(context.Background(), client, cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to check if bucket") + assert.Equal(t, 1, client.headCalls) + assert.Equal(t, 0, client.createCalls) +} + +func TestBuildCreateBucketInput_DefaultRegion(t *testing.T) { + input := buildCreateBucketInput(&blobStorageConfig{ + bucketName: "test", + region: "us-east-1", + }) + + assert.Nil(t, input.CreateBucketConfiguration) +} + +type fakeS3HTTPServer struct { + t *testing.T + server *httptest.Server + buckets map[string]bool + log []string +} + +func newFakeS3HTTPServer(t *testing.T) *fakeS3HTTPServer { + f := &fakeS3HTTPServer{ + t: t, + buckets: make(map[string]bool), + } + f.server = httptest.NewServer(http.HandlerFunc(f.handle)) + t.Cleanup(f.server.Close) + return f +} + +func (f *fakeS3HTTPServer) endpoint() string { + return strings.TrimPrefix(f.server.URL, "http://") +} + +func (f *fakeS3HTTPServer) handle(w http.ResponseWriter, r *http.Request) { + parts := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/") + if len(parts) == 0 || parts[0] == "" { + http.Error(w, "missing bucket", http.StatusBadRequest) + return + } + bucket := parts[0] + + switch r.Method { + case http.MethodHead: + f.log = append(f.log, "HEAD:"+bucket) + if f.buckets[bucket] { + w.WriteHeader(http.StatusOK) + return + } + http.Error(w, "not found", http.StatusNotFound) + case http.MethodPut: + f.log = append(f.log, "PUT:"+bucket) + if f.buckets[bucket] { + // Simulate already existing bucket. + http.Error(w, "BucketAlreadyOwnedByYou", http.StatusConflict) + return + } + f.buckets[bucket] = true + w.WriteHeader(http.StatusOK) + default: + http.Error(w, "unsupported", http.StatusMethodNotAllowed) + } +} + +func (f *fakeS3HTTPServer) setBucket(name string) { + f.buckets[name] = true +} + +func (f *fakeS3HTTPServer) requestLog() []string { + return f.log +} + +func TestEnsureBucketExists_IntegrationCreatesBucket(t *testing.T) { + server := newFakeS3HTTPServer(t) + cfg := &blobStorageConfig{ + bucketName: "integration-bucket", + endpoint: server.endpoint(), + region: "us-west-2", + secure: false, + accessKey: "key", + secretKey: "secret", + } + + err := ensureBucketExists(context.Background(), cfg) + require.NoError(t, err) + + log := server.requestLog() + require.Len(t, log, 2) + assert.Equal(t, "HEAD:integration-bucket", log[0]) + assert.Equal(t, "PUT:integration-bucket", log[1]) +} + +func TestEnsureBucketExists_IntegrationBucketAlreadyExists(t *testing.T) { + server := newFakeS3HTTPServer(t) + server.setBucket("existing-bucket") + + cfg := &blobStorageConfig{ + bucketName: "existing-bucket", + endpoint: server.endpoint(), + region: "us-east-1", + secure: false, + accessKey: "key", + secretKey: "secret", + } + + err := ensureBucketExists(context.Background(), cfg) + require.NoError(t, err) + + log := server.requestLog() + require.Len(t, log, 1) + assert.Equal(t, "HEAD:existing-bucket", log[0]) +} + +func TestLoadAWSConfig_EmptyCredentials(t *testing.T) { + cfg := &blobStorageConfig{ + region: "us-west-2", + accessKey: "", + secretKey: "", + } + + awsCfg, err := loadAWSConfig(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, "us-west-2", awsCfg.Region) +} + +func TestLoadAWSConfig_WithCredentials(t *testing.T) { + cfg := &blobStorageConfig{ + region: "us-east-1", + accessKey: "test-key", + secretKey: "test-secret", + } + + awsCfg, err := loadAWSConfig(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, "us-east-1", awsCfg.Region) + + creds, err := awsCfg.Credentials.Retrieve(context.Background()) + require.NoError(t, err) + assert.Equal(t, "test-key", creds.AccessKeyID) + assert.Equal(t, "test-secret", creds.SecretAccessKey) +} diff --git a/go.mod b/go.mod index ba0d5958360..2c5ddfe4edb 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.29.14 github.com/aws/aws-sdk-go-v2/credentials v1.17.67 github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3 + github.com/aws/smithy-go v1.22.3 gorm.io/driver/mysql v1.6.0 gorm.io/driver/postgres v1.6.0 gorm.io/driver/sqlite v1.6.0 @@ -112,7 +113,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect - github.com/aws/smithy-go v1.22.3 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v5 v5.0.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect