Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 125 additions & 49 deletions backend/src/apiserver/client_manager/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package clientmanager
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
Expand All @@ -26,6 +27,8 @@ import (
awsv2cfg "github.com/aws/aws-sdk-go-v2/config"
awsv2creds "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"github.com/cenkalti/backoff"
mysqlStd "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
Expand All @@ -38,8 +41,6 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/validation"
"github.com/kubeflow/pipelines/backend/src/common/util"
k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
Expand Down Expand Up @@ -952,15 +953,15 @@ func initBlobObjectStore(ctx context.Context, initConnectionTimeout time.Duratio
return nil, fmt.Errorf("failed to build config from environment variables: %w", err)
}

if err := ensureBucketExists(ctx, blobConfig); err != nil {
glog.Warningf("Failed to ensure bucket exists (may already exist): %v", err)
}

bucket, err := openBucketWithRetry(ctx, blobConfig, initConnectionTimeout)
if err != nil {
return nil, fmt.Errorf("failed to open blob storage bucket: %w", err)
}

if err := ensureBucketExists(ctx, blobConfig); err != nil {
glog.Warningf("Failed to ensure bucket exists (may already exist): %v", err)
}

glog.Infof("Successfully initialized blob storage for bucket: %s", blobConfig.bucketName)
return storage.NewBlobObjectStore(bucket, pipelinePath), nil
}
Expand All @@ -975,6 +976,11 @@ type blobStorageConfig struct {
secretKey string
}

type s3BucketAPI interface {
HeadBucket(ctx context.Context, params *s3.HeadBucketInput, optFns ...func(*s3.Options)) (*s3.HeadBucketOutput, error)
CreateBucket(ctx context.Context, params *s3.CreateBucketInput, optFns ...func(*s3.Options)) (*s3.CreateBucketOutput, error)
}

// ensureProtocol adds http:// or https:// protocol if not present
func ensureProtocol(endpoint string, secure bool) string {
if strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://") {
Expand Down Expand Up @@ -1021,46 +1027,68 @@ func buildConfigFromEnvVars() (*blobStorageConfig, error) {
}, nil
}

func validateRequiredConfig(bucketName string, host string, accessKey string, secretKey string) error {
func newS3BucketClient(ctx context.Context, config *blobStorageConfig) (*s3.Client, error) {
awsCfg, err := loadAWSConfig(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to create AWS config: %w", err)
}

endpointWithProtocol := ensureProtocol(config.endpoint, config.secure)
s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.BaseEndpoint = awsv2.String(endpointWithProtocol)
o.UsePathStyle = true
})

return s3Client, nil
}

func loadAWSConfig(ctx context.Context, config *blobStorageConfig) (awsv2.Config, error) {
opts := []func(*awsv2cfg.LoadOptions) error{
awsv2cfg.WithRegion(config.region),
}
if config.accessKey != "" && config.secretKey != "" {
opts = append(opts, awsv2cfg.WithCredentialsProvider(
awsv2creds.NewStaticCredentialsProvider(config.accessKey, config.secretKey, ""),
))
}
cfg, err := awsv2cfg.LoadDefaultConfig(ctx, opts...)
if err != nil {
return awsv2.Config{}, err
}
return cfg, nil
}

// validateRequiredConfig validates the required object store configuration fields.
// bucketName and host are always required. Credentials (accessKey/secretKey) are optional
// to support AWS IRSA (IAM Roles for Service Accounts), environment variables,
// and instance profile-based authentication through the default AWS credential chain.
// However, if credentials are provided, both accessKey and secretKey must be set.
func validateRequiredConfig(bucketName, host, accessKey, secretKey string) error {
if bucketName == "" {
return fmt.Errorf("ObjectStoreConfig.BucketName is required")
}
if host == "" {
return fmt.Errorf("ObjectStoreConfig.Host is required")
}
if accessKey == "" {
return fmt.Errorf("ObjectStoreConfig.AccessKey is required")
}
if secretKey == "" {
return fmt.Errorf("ObjectStoreConfig.SecretAccessKey is required")
if (accessKey != "") != (secretKey != "") {
return fmt.Errorf("ObjectStoreConfig.AccessKey and ObjectStoreConfig.SecretAccessKey must both be set or both be empty")
}
return nil
}

// openBucketWithRetry opens a blob bucket using AWS SDK v2 with explicit credentials and retry logic
// openBucketWithRetry opens a blob bucket using AWS SDK v2 with retry logic.
// When accessKey and secretKey are empty, it uses the default credential chain
// (supports IRSA, environment variables, instance profiles, etc.)
func openBucketWithRetry(ctx context.Context, config *blobStorageConfig, timeout time.Duration) (*blob.Bucket, error) {
var bucket *blob.Bucket
var err error

operation := func() error {
cfg, err := awsv2cfg.LoadDefaultConfig(ctx,
awsv2cfg.WithRegion(config.region),
awsv2cfg.WithCredentialsProvider(awsv2creds.NewStaticCredentialsProvider(
config.accessKey,
config.secretKey,
"",
)),
)
s3Client, err := newS3BucketClient(ctx, config)
if err != nil {
return fmt.Errorf("failed to create AWS config: %w", err)
return err
}

endpointWithProtocol := ensureProtocol(config.endpoint, config.secure)
s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.BaseEndpoint = awsv2.String(endpointWithProtocol)
o.UsePathStyle = true
})

bucket, err = s3blob.OpenBucketV2(ctx, s3Client, config.bucketName, nil)
return err
}
Expand All @@ -1076,38 +1104,86 @@ func openBucketWithRetry(ctx context.Context, config *blobStorageConfig, timeout
return bucket, nil
}

func createMinioBucket(ctx context.Context, minioClient *minio.Client, bucketName, region string) {
// Check to see if it exists, and we have permission to access it.
exists, err := minioClient.BucketExists(ctx, bucketName)
if err != nil {
glog.Fatalf("Failed to check if object store bucket exists. Error: %v", err)
}
if exists {
glog.Infof("We already own %s\n", bucketName)
return
}
// Create bucket if it does not exist
err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: region})
// ensureBucketExists creates the bucket if it doesn't exist.
// It relies on the AWS SDK default credential chain (plus optional static creds) so IRSA/web-identity
// tokens, environment variables, and instance profiles are all supported.
func ensureBucketExists(ctx context.Context, config *blobStorageConfig) error {
s3Client, err := newS3BucketClient(ctx, config)
if err != nil {
glog.Fatalf("Failed to create object store bucket. Error: %v", err)
return fmt.Errorf("failed to create S3 client: %w", err)
}
glog.Infof("Successfully created bucket %s\n", bucketName)

return ensureBucketExistsWithClient(ctx, s3Client, config)
}

// ensureBucketExists creates a bucket if it doesn't exist
func ensureBucketExists(ctx context.Context, config *blobStorageConfig) error {
minioClient, err := minio.New(config.endpoint, &minio.Options{
Creds: credentials.NewStaticV4(config.accessKey, config.secretKey, ""),
Secure: config.secure,
func ensureBucketExistsWithClient(ctx context.Context, client s3BucketAPI, config *blobStorageConfig) error {
_, err := client.HeadBucket(ctx, &s3.HeadBucketInput{
Bucket: awsv2.String(config.bucketName),
})
if err == nil {
glog.Infof("Bucket %s already exists and is accessible", config.bucketName)
return nil
}

if !isBucketNotFoundError(err) {
return fmt.Errorf("failed to check if bucket %s exists: %w", config.bucketName, err)
}

glog.Infof("Bucket %s not found, attempting to create", config.bucketName)
_, err = client.CreateBucket(ctx, buildCreateBucketInput(config))
if err != nil {
return fmt.Errorf("failed to create MinIO client: %w", err)
if isBucketAlreadyOwnedByUs(err) {
glog.Infof("Bucket %s was created concurrently by us", config.bucketName)
return nil
}
return fmt.Errorf("failed to create object store bucket: %w", err)
}

createMinioBucket(ctx, minioClient, config.bucketName, config.region)
glog.Infof("Successfully created bucket %s", config.bucketName)
return nil
}

func buildCreateBucketInput(config *blobStorageConfig) *s3.CreateBucketInput {
input := &s3.CreateBucketInput{
Bucket: awsv2.String(config.bucketName),
}
if config.region != "" && config.region != "us-east-1" {
input.CreateBucketConfiguration = &s3types.CreateBucketConfiguration{
LocationConstraint: s3types.BucketLocationConstraint(config.region),
}
}

return input
}

func isBucketNotFoundError(err error) bool {
var notFound *s3types.NotFound
if errors.As(err, &notFound) {
return true
}
var noSuchBucket *s3types.NoSuchBucket
if errors.As(err, &noSuchBucket) {
return true
}
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
code := apiErr.ErrorCode()
return code == "NotFound" || code == "NoSuchBucket" || code == "404"
}
return false
}

// isBucketAlreadyOwnedByUs returns true only for BucketAlreadyOwnedByYou errors,
// which indicates a race condition where we created the bucket concurrently.
// BucketAlreadyExists (bucket owned by another account on AWS) is NOT suppressed
// so it surfaces in logs as a warning, helping detect misconfiguration.
// Note: SeaweedFS/MinIO return BucketAlreadyExists even for buckets you own,
// but since ensureBucketExists failures are just warnings, this is acceptable.
func isBucketAlreadyOwnedByUs(err error) bool {
var owned *s3types.BucketAlreadyOwnedByYou
return errors.As(err, &owned)
}

func initLogArchive() (logArchive archive.LogArchiveInterface) {
logFileName := common.GetStringConfigWithDefault(archiveLogFileName, "")
logPathPrefix := common.GetStringConfigWithDefault(archiveLogPathPrefix, "")
Expand Down
Loading
Loading