Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Achieve capacity limits #41

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Achieve capacity limits
duanhongyi committed Jul 22, 2022
commit 87270a7085108907647a3fdd4b85e09a2e0771c2
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -3,3 +3,4 @@ goofys
goofys.test
xout
s3proxy.jar
.vscode
10 changes: 7 additions & 3 deletions api/common/config.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ import (
)

type PartSizeConfig struct {
PartSize uint64
PartSize uint64
PartCount uint64
}

@@ -45,8 +45,12 @@ type FlagStorage struct {

// Common Backend Config
UseContentType bool
Endpoint string
Backend interface{}

Provider string
Capacity uint64
DiskUsageInterval uint64
Endpoint string
Backend interface{}

// Tuning
MemoryLimit uint64
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -16,24 +16,21 @@ require (
github.com/aws/aws-sdk-go v1.38.7
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.2
github.com/gopherjs/gopherjs v0.0.0-20210202160940-bed99a852dfe // indirect
github.com/jacobsa/fuse v0.0.0-20210818065549-10d864429bf7
github.com/jtolds/gls v4.2.0+incompatible // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/kr/pretty v0.1.1-0.20190720101428-71e7e4993750 // indirect
github.com/mattn/go-ieproxy v0.0.0-20190805055040-f9202b1cfdeb // indirect
github.com/minio/madmin-go v1.4.6
github.com/mitchellh/go-homedir v1.1.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/sevlyar/go-daemon v0.1.5
github.com/shirou/gopsutil v0.0.0-20190731134726-d80c43f9c984
github.com/sirupsen/logrus v1.8.1
github.com/smartystreets/assertions v0.0.0-20160201214316-443d812296a8 // indirect
github.com/smartystreets/goconvey v1.6.1-0.20160119221636-995f5b2e021c // indirect
github.com/urfave/cli v1.21.1-0.20190807111034-521735b7608a
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
google.golang.org/api v0.49.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
gopkg.in/ini.v1 v1.46.0
gopkg.in/ini.v1 v1.62.0
)

replace github.com/aws/aws-sdk-go => ./s3ext
163 changes: 163 additions & 0 deletions go.sum

Large diffs are not rendered by default.

26 changes: 22 additions & 4 deletions internal/backend.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ import (
)

type Capabilities struct {
MaxMultipartSize uint64
MaxMultipartSize uint64
// indicates that the blob store has native support for directories
DirBlob bool
Name string
@@ -44,7 +44,7 @@ type BlobItemOutput struct {
Size uint64
StorageClass *string
// may be nil in list responses for backends that don't return metadata in listings
Metadata map[string]*string
Metadata map[string]*string
}

type HeadBlobOutput struct {
@@ -179,7 +179,7 @@ type MultipartBlobAddInput struct {

type MultipartBlobAddOutput struct {
RequestId string
PartId *string
PartId *string
}

type MultipartBlobCopyInput struct {
@@ -192,7 +192,7 @@ type MultipartBlobCopyInput struct {

type MultipartBlobCopyOutput struct {
RequestId string
PartId *string
PartId *string
}

type MultipartBlobCommitOutput struct {
@@ -228,6 +228,14 @@ type MakeBucketOutput struct {
RequestId string
}

type GetBucketUsageInput struct {
}

type GetBucketUsageOutput struct {
Size uint64
RequestId string
}

/// Implementations of all the functions here are expected to be
/// concurrency-safe, except for
///
@@ -256,6 +264,7 @@ type StorageBackend interface {
MultipartExpire(param *MultipartExpireInput) (*MultipartExpireOutput, error)
RemoveBucket(param *RemoveBucketInput) (*RemoveBucketOutput, error)
MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error)
GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error)
Delegate() interface{}
}

@@ -424,6 +433,11 @@ func (s *StorageBackendInitWrapper) MakeBucket(param *MakeBucketInput) (*MakeBuc
return s.StorageBackend.MakeBucket(param)
}

func (s *StorageBackendInitWrapper) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
s.Init("")
return s.StorageBackend.GetBucketUsage(param)
}

type StorageBackendInitError struct {
error
cap Capabilities
@@ -547,3 +561,7 @@ func (e StorageBackendInitError) RemoveBucket(param *RemoveBucketInput) (*Remove
func (e StorageBackendInitError) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error) {
return nil, e
}

func (e StorageBackendInitError) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return nil, e
}
8 changes: 6 additions & 2 deletions internal/backend_adlv1.go
Original file line number Diff line number Diff line change
@@ -152,8 +152,8 @@ func NewADLv1(bucket string, flags *FlagStorage, config *ADLv1Config) (*ADLv1, e
bucket: bucket,
cap: Capabilities{
//NoParallelMultipart: true,
DirBlob: true,
Name: "adl",
DirBlob: true,
Name: "adl",
// ADLv1 fails with 404 if we upload data
// larger than 30000000 bytes (28.6MB) (28MB
// also failed in at one point, but as of
@@ -701,3 +701,7 @@ func (b *ADLv1) mkdir(dir string) error {
}
return nil
}

func (b *ADLv1) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
4 changes: 4 additions & 0 deletions internal/backend_adlv2.go
Original file line number Diff line number Diff line change
@@ -207,6 +207,10 @@ func (b *ADLv2) Capabilities() *Capabilities {
return &b.cap
}

func (b *ADLv2) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}

type ADL2Error struct {
adl2.DataLakeStorageError
}
10 changes: 7 additions & 3 deletions internal/backend_azblob.go
Original file line number Diff line number Diff line change
@@ -222,7 +222,7 @@ func (b *AZBlob) refreshToken() (*azblob.ContainerURL, error) {
// our token totally expired, renew inline before using it
b.mu.Unlock()
b.tokenRenewGate <- 1
defer func() { <- b.tokenRenewGate } ()
defer func() { <-b.tokenRenewGate }()

b.mu.Lock()
// check again, because in the mean time maybe it's renewed
@@ -248,7 +248,7 @@ func (b *AZBlob) refreshToken() (*azblob.ContainerURL, error) {
if err != nil {
azbLog.Errorf("Unable to refresh token: %v", err)
}
<- b.tokenRenewGate
<-b.tokenRenewGate
}()

// if we cannot renew token, treat it as a
@@ -665,7 +665,7 @@ func (b *AZBlob) DeleteBlobs(param *DeleteBlobsInput) (ret *DeleteBlobsOutput, d

go func(key string) {
defer func() {
<- SmallActionsGate
<-SmallActionsGate
wg.Done()
}()

@@ -947,3 +947,7 @@ func (b *AZBlob) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error) {
}
return &MakeBucketOutput{}, nil
}

func (b *AZBlob) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
16 changes: 10 additions & 6 deletions internal/backend_gcs3.go
Original file line number Diff line number Diff line change
@@ -23,15 +23,15 @@ import (
"sync"
"syscall"

"github.com/jacobsa/fuse"
"cloud.google.com/go/storage"
"github.com/jacobsa/fuse"
"google.golang.org/api/iterator"
)

// GCS variant of S3
type GCS3 struct {
*S3Backend
gcs *storage.Client
gcs *storage.Client
jsonCredFile string
}

@@ -103,12 +103,12 @@ func (s *GCS3) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) {
})
} else {
items = append(items, BlobItemOutput{
Key: &attrs.Name,
ETag: &attrs.Etag,
Key: &attrs.Name,
ETag: &attrs.Etag,
LastModified: &attrs.Updated,
Size: uint64(attrs.Size),
Size: uint64(attrs.Size),
StorageClass: &attrs.StorageClass,
Metadata: PMetadata(attrs.Metadata),
Metadata: PMetadata(attrs.Metadata),
})
}
n++
@@ -156,3 +156,7 @@ func (s *GCS3) DeleteBlobs(param *DeleteBlobsInput) (*DeleteBlobsOutput, error)
func (s *GCS3) MultipartBlobCopy(param *MultipartBlobCopyInput) (*MultipartBlobCopyOutput, error) {
return nil, syscall.ENOSYS
}

func (s *GCS3) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
68 changes: 68 additions & 0 deletions internal/backend_minio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2019 Ka-Hing Cheung
// Copyright 2021 Yandex LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"
"encoding/json"
"io/ioutil"
"net/url"

"github.com/minio/madmin-go"
. "github.com/yandex-cloud/geesefs/api/common"
)

type BucketsUsage struct {
BucketsSizes map[string]uint64
}

type MinioBackend struct {
*S3Backend
}

func NewMinio(bucket string, flags *FlagStorage, config *S3Config) (*MinioBackend, error) {
s3Backend, err := NewS3(bucket, flags, config)
if err != nil {
return nil, err
}
s3Backend.Capabilities().Name = "minio"
s := &MinioBackend{S3Backend: s3Backend}
return s, nil
}

func (s *MinioBackend) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
value, err := s.Config.Credentials.Get()
if err != nil {
return nil, err
}

endpointURL, _ := url.Parse(s.Endpoint)
madminClient, err := madmin.New(endpointURL.Host, value.AccessKeyID, value.SecretAccessKey, endpointURL.Scheme == "https")
if err != nil {
return nil, err
}
resp, err := madminClient.ExecuteMethod(context.Background(), "GET", madmin.RequestData{RelPath: "/v3/datausageinfo"})
if err != nil {
return nil, err
}
response, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result BucketsUsage
json.Unmarshal(response, &result)
return &GetBucketUsageOutput{Size: result.BucketsSizes[s.Bucket()]}, nil
}
26 changes: 15 additions & 11 deletions internal/backend_s3.go
Original file line number Diff line number Diff line change
@@ -55,15 +55,15 @@ type S3Backend struct {
gcs bool
v2Signer bool

iam bool
iamToken atomic.Value
iam bool
iamToken atomic.Value
iamTokenExpiration time.Time
iamRefreshTimer *time.Timer
iamRefreshTimer *time.Timer
}

func NewS3(bucket string, flags *FlagStorage, config *S3Config) (*S3Backend, error) {
if config.MultipartCopyThreshold == 0 {
config.MultipartCopyThreshold = 128*1024*1024
config.MultipartCopyThreshold = 128 * 1024 * 1024
}
awsConfig, err := config.ToAwsConfig(flags)
if err != nil {
@@ -100,8 +100,8 @@ func NewS3(bucket string, flags *FlagStorage, config *S3Config) (*S3Backend, err
}

type IAMCredResponse struct {
Code string
Token string
Code string
Token string
Expiration time.Time
}

@@ -128,11 +128,11 @@ func (s *S3Backend) TryIAM() error {
s.iam = true
s.iamToken.Store(creds.Token)
s.iamTokenExpiration = creds.Expiration
ttl := s.iamTokenExpiration.Sub(time.Now().Add(5*time.Minute))
ttl := s.iamTokenExpiration.Sub(time.Now().Add(5 * time.Minute))
if ttl < 0 {
ttl = s.iamTokenExpiration.Sub(time.Now())
if ttl >= 30*time.Second {
ttl = 30*time.Second
ttl = 30 * time.Second
}
}
s.iamRefreshTimer = time.AfterFunc(ttl, func() {
@@ -997,7 +997,7 @@ func (s *S3Backend) MultipartBlobAdd(param *MultipartBlobAddInput) (*MultipartBl

return &MultipartBlobAddOutput{
RequestId: s.getRequestId(req),
PartId: resp.ETag,
PartId: resp.ETag,
}, nil
}

@@ -1006,7 +1006,7 @@ func (s *S3Backend) MultipartBlobCopy(param *MultipartBlobCopyInput) (*Multipart
Bucket: &s.bucket,
Key: param.Commit.Key,
PartNumber: aws.Int64(int64(param.PartNumber)),
CopySource: aws.String(pathEscape(s.bucket+"/"+param.CopySource)),
CopySource: aws.String(pathEscape(s.bucket + "/" + param.CopySource)),
UploadId: param.Commit.UploadId,
}
if param.Size != 0 {
@@ -1028,7 +1028,7 @@ func (s *S3Backend) MultipartBlobCopy(param *MultipartBlobCopyInput) (*Multipart

return &MultipartBlobCopyOutput{
RequestId: s.getRequestId(req),
PartId: resp.CopyPartResult.ETag,
PartId: resp.CopyPartResult.ETag,
}, nil
}

@@ -1172,3 +1172,7 @@ func (s *S3Backend) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error
func (s *S3Backend) Delegate() interface{} {
return s
}

func (s *S3Backend) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
Loading