Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Achieve capacity limits #41

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -3,3 +3,4 @@ goofys
goofys.test
xout
s3proxy.jar
.vscode
10 changes: 7 additions & 3 deletions api/common/config.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ import (
)

type PartSizeConfig struct {
PartSize uint64
PartSize uint64
PartCount uint64
}

@@ -45,8 +45,12 @@ type FlagStorage struct {

// Common Backend Config
UseContentType bool
Endpoint string
Backend interface{}

Provider string
Capacity uint64
DiskUsageInterval uint64
Endpoint string
Backend interface{}

// Tuning
MemoryLimit uint64
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -16,24 +16,21 @@ require (
github.com/aws/aws-sdk-go v1.38.7
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.2
github.com/gopherjs/gopherjs v0.0.0-20210202160940-bed99a852dfe // indirect
github.com/jacobsa/fuse v0.0.0-20210818065549-10d864429bf7
github.com/jtolds/gls v4.2.0+incompatible // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/kr/pretty v0.1.1-0.20190720101428-71e7e4993750 // indirect
github.com/mattn/go-ieproxy v0.0.0-20190805055040-f9202b1cfdeb // indirect
github.com/minio/madmin-go v1.4.6
github.com/mitchellh/go-homedir v1.1.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/sevlyar/go-daemon v0.1.5
github.com/shirou/gopsutil v0.0.0-20190731134726-d80c43f9c984
github.com/sirupsen/logrus v1.8.1
github.com/smartystreets/assertions v0.0.0-20160201214316-443d812296a8 // indirect
github.com/smartystreets/goconvey v1.6.1-0.20160119221636-995f5b2e021c // indirect
github.com/urfave/cli v1.21.1-0.20190807111034-521735b7608a
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
google.golang.org/api v0.49.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
gopkg.in/ini.v1 v1.46.0
gopkg.in/ini.v1 v1.62.0
)

replace github.com/aws/aws-sdk-go => ./s3ext
163 changes: 163 additions & 0 deletions go.sum

Large diffs are not rendered by default.

26 changes: 22 additions & 4 deletions internal/backend.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ import (
)

type Capabilities struct {
MaxMultipartSize uint64
MaxMultipartSize uint64
// indicates that the blob store has native support for directories
DirBlob bool
Name string
@@ -44,7 +44,7 @@ type BlobItemOutput struct {
Size uint64
StorageClass *string
// may be nil in list responses for backends that don't return metadata in listings
Metadata map[string]*string
Metadata map[string]*string
}

type HeadBlobOutput struct {
@@ -179,7 +179,7 @@ type MultipartBlobAddInput struct {

type MultipartBlobAddOutput struct {
RequestId string
PartId *string
PartId *string
}

type MultipartBlobCopyInput struct {
@@ -192,7 +192,7 @@ type MultipartBlobCopyInput struct {

type MultipartBlobCopyOutput struct {
RequestId string
PartId *string
PartId *string
}

type MultipartBlobCommitOutput struct {
@@ -228,6 +228,14 @@ type MakeBucketOutput struct {
RequestId string
}

type GetBucketUsageInput struct {
}

type GetBucketUsageOutput struct {
Size uint64
RequestId string
}

/// Implementations of all the functions here are expected to be
/// concurrency-safe, except for
///
@@ -256,6 +264,7 @@ type StorageBackend interface {
MultipartExpire(param *MultipartExpireInput) (*MultipartExpireOutput, error)
RemoveBucket(param *RemoveBucketInput) (*RemoveBucketOutput, error)
MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error)
GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error)
Delegate() interface{}
}

@@ -424,6 +433,11 @@ func (s *StorageBackendInitWrapper) MakeBucket(param *MakeBucketInput) (*MakeBuc
return s.StorageBackend.MakeBucket(param)
}

func (s *StorageBackendInitWrapper) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
s.Init("")
return s.StorageBackend.GetBucketUsage(param)
}

type StorageBackendInitError struct {
error
cap Capabilities
@@ -547,3 +561,7 @@ func (e StorageBackendInitError) RemoveBucket(param *RemoveBucketInput) (*Remove
func (e StorageBackendInitError) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error) {
return nil, e
}

func (e StorageBackendInitError) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return nil, e
}
8 changes: 6 additions & 2 deletions internal/backend_adlv1.go
Original file line number Diff line number Diff line change
@@ -152,8 +152,8 @@ func NewADLv1(bucket string, flags *FlagStorage, config *ADLv1Config) (*ADLv1, e
bucket: bucket,
cap: Capabilities{
//NoParallelMultipart: true,
DirBlob: true,
Name: "adl",
DirBlob: true,
Name: "adl",
// ADLv1 fails with 404 if we upload data
// larger than 30000000 bytes (28.6MB) (28MB
// also failed in at one point, but as of
@@ -701,3 +701,7 @@ func (b *ADLv1) mkdir(dir string) error {
}
return nil
}

func (b *ADLv1) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
4 changes: 4 additions & 0 deletions internal/backend_adlv2.go
Original file line number Diff line number Diff line change
@@ -207,6 +207,10 @@ func (b *ADLv2) Capabilities() *Capabilities {
return &b.cap
}

func (b *ADLv2) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}

type ADL2Error struct {
adl2.DataLakeStorageError
}
10 changes: 7 additions & 3 deletions internal/backend_azblob.go
Original file line number Diff line number Diff line change
@@ -222,7 +222,7 @@ func (b *AZBlob) refreshToken() (*azblob.ContainerURL, error) {
// our token totally expired, renew inline before using it
b.mu.Unlock()
b.tokenRenewGate <- 1
defer func() { <- b.tokenRenewGate } ()
defer func() { <-b.tokenRenewGate }()

b.mu.Lock()
// check again, because in the mean time maybe it's renewed
@@ -248,7 +248,7 @@ func (b *AZBlob) refreshToken() (*azblob.ContainerURL, error) {
if err != nil {
azbLog.Errorf("Unable to refresh token: %v", err)
}
<- b.tokenRenewGate
<-b.tokenRenewGate
}()

// if we cannot renew token, treat it as a
@@ -665,7 +665,7 @@ func (b *AZBlob) DeleteBlobs(param *DeleteBlobsInput) (ret *DeleteBlobsOutput, d

go func(key string) {
defer func() {
<- SmallActionsGate
<-SmallActionsGate
wg.Done()
}()

@@ -947,3 +947,7 @@ func (b *AZBlob) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error) {
}
return &MakeBucketOutput{}, nil
}

func (b *AZBlob) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
16 changes: 10 additions & 6 deletions internal/backend_gcs3.go
Original file line number Diff line number Diff line change
@@ -23,15 +23,15 @@ import (
"sync"
"syscall"

"github.com/jacobsa/fuse"
"cloud.google.com/go/storage"
"github.com/jacobsa/fuse"
"google.golang.org/api/iterator"
)

// GCS variant of S3
type GCS3 struct {
*S3Backend
gcs *storage.Client
gcs *storage.Client
jsonCredFile string
}

@@ -103,12 +103,12 @@ func (s *GCS3) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) {
})
} else {
items = append(items, BlobItemOutput{
Key: &attrs.Name,
ETag: &attrs.Etag,
Key: &attrs.Name,
ETag: &attrs.Etag,
LastModified: &attrs.Updated,
Size: uint64(attrs.Size),
Size: uint64(attrs.Size),
StorageClass: &attrs.StorageClass,
Metadata: PMetadata(attrs.Metadata),
Metadata: PMetadata(attrs.Metadata),
})
}
n++
@@ -156,3 +156,7 @@ func (s *GCS3) DeleteBlobs(param *DeleteBlobsInput) (*DeleteBlobsOutput, error)
func (s *GCS3) MultipartBlobCopy(param *MultipartBlobCopyInput) (*MultipartBlobCopyOutput, error) {
return nil, syscall.ENOSYS
}

func (s *GCS3) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
68 changes: 68 additions & 0 deletions internal/backend_minio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2019 Ka-Hing Cheung
// Copyright 2021 Yandex LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"
"encoding/json"
"io/ioutil"
"net/url"

"github.com/minio/madmin-go"
. "github.com/yandex-cloud/geesefs/api/common"
)

type BucketsUsage struct {
BucketsSizes map[string]uint64
}

type MinioBackend struct {
*S3Backend
}

func NewMinio(bucket string, flags *FlagStorage, config *S3Config) (*MinioBackend, error) {
s3Backend, err := NewS3(bucket, flags, config)
if err != nil {
return nil, err
}
s3Backend.Capabilities().Name = "minio"
s := &MinioBackend{S3Backend: s3Backend}
return s, nil
}

func (s *MinioBackend) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
value, err := s.Config.Credentials.Get()
if err != nil {
return nil, err
}

endpointURL, _ := url.Parse(s.Endpoint)
madminClient, err := madmin.New(endpointURL.Host, value.AccessKeyID, value.SecretAccessKey, endpointURL.Scheme == "https")
if err != nil {
return nil, err
}
resp, err := madminClient.ExecuteMethod(context.Background(), "GET", madmin.RequestData{RelPath: "/v3/datausageinfo"})
if err != nil {
return nil, err
}
response, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result BucketsUsage
json.Unmarshal(response, &result)
return &GetBucketUsageOutput{Size: result.BucketsSizes[s.Bucket()]}, nil
}
26 changes: 15 additions & 11 deletions internal/backend_s3.go
Original file line number Diff line number Diff line change
@@ -55,15 +55,15 @@ type S3Backend struct {
gcs bool
v2Signer bool

iam bool
iamToken atomic.Value
iam bool
iamToken atomic.Value
iamTokenExpiration time.Time
iamRefreshTimer *time.Timer
iamRefreshTimer *time.Timer
}

func NewS3(bucket string, flags *FlagStorage, config *S3Config) (*S3Backend, error) {
if config.MultipartCopyThreshold == 0 {
config.MultipartCopyThreshold = 128*1024*1024
config.MultipartCopyThreshold = 128 * 1024 * 1024
}
awsConfig, err := config.ToAwsConfig(flags)
if err != nil {
@@ -100,8 +100,8 @@ func NewS3(bucket string, flags *FlagStorage, config *S3Config) (*S3Backend, err
}

type IAMCredResponse struct {
Code string
Token string
Code string
Token string
Expiration time.Time
}

@@ -128,11 +128,11 @@ func (s *S3Backend) TryIAM() error {
s.iam = true
s.iamToken.Store(creds.Token)
s.iamTokenExpiration = creds.Expiration
ttl := s.iamTokenExpiration.Sub(time.Now().Add(5*time.Minute))
ttl := s.iamTokenExpiration.Sub(time.Now().Add(5 * time.Minute))
if ttl < 0 {
ttl = s.iamTokenExpiration.Sub(time.Now())
if ttl >= 30*time.Second {
ttl = 30*time.Second
ttl = 30 * time.Second
}
}
s.iamRefreshTimer = time.AfterFunc(ttl, func() {
@@ -997,7 +997,7 @@ func (s *S3Backend) MultipartBlobAdd(param *MultipartBlobAddInput) (*MultipartBl

return &MultipartBlobAddOutput{
RequestId: s.getRequestId(req),
PartId: resp.ETag,
PartId: resp.ETag,
}, nil
}

@@ -1006,7 +1006,7 @@ func (s *S3Backend) MultipartBlobCopy(param *MultipartBlobCopyInput) (*Multipart
Bucket: &s.bucket,
Key: param.Commit.Key,
PartNumber: aws.Int64(int64(param.PartNumber)),
CopySource: aws.String(pathEscape(s.bucket+"/"+param.CopySource)),
CopySource: aws.String(pathEscape(s.bucket + "/" + param.CopySource)),
UploadId: param.Commit.UploadId,
}
if param.Size != 0 {
@@ -1028,7 +1028,7 @@ func (s *S3Backend) MultipartBlobCopy(param *MultipartBlobCopyInput) (*Multipart

return &MultipartBlobCopyOutput{
RequestId: s.getRequestId(req),
PartId: resp.CopyPartResult.ETag,
PartId: resp.CopyPartResult.ETag,
}, nil
}

@@ -1172,3 +1172,7 @@ func (s *S3Backend) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error
func (s *S3Backend) Delegate() interface{} {
return s
}

func (s *S3Backend) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
165 changes: 93 additions & 72 deletions internal/flags.go
Original file line number Diff line number Diff line change
@@ -90,13 +90,31 @@ func NewApp() (app *cli.App) {
// File system
/////////////////////////

cli.StringFlag{
Name: "provider",
Value: "unknow",
Usage: "The specific provider of the object storage service.",
},

cli.Uint64Flag{
Name: "capacity",
Value: 1 * 1024 * 1024 * 1024 * 1024 * 1024,
Usage: "Limit the maximum capacity quota of mounted files.",
},

cli.Uint64Flag{
Name: "disk-usage-interval",
Value: 60,
Usage: "Time interval for statistics of disk usage, in seconds.",
},

cli.StringSliceFlag{
Name: "o",
Usage: "Additional system-specific mount options. Be careful!",
},

cli.StringFlag{
Name: "cache",
Name: "cache",
Usage: "Directory to use for data cache. (default: off)",
},

@@ -286,20 +304,20 @@ func NewApp() (app *cli.App) {
cli.IntFlag{
Name: "max-parallel-parts",
Value: 8,
Usage: "How much parallel requests out of the total number can be used for large part uploads."+
Usage: "How much parallel requests out of the total number can be used for large part uploads." +
" Large parts take more bandwidth so they usually require less parallelism (default: 8)",
},

cli.IntFlag{
Name: "max-parallel-copy",
Value: 16,
Usage: "How much parallel unmodified part copy requests should be used."+
Usage: "How much parallel unmodified part copy requests should be used." +
" This limit is separate from max-flushers (default: 16)",
},

cli.IntFlag{
Name: "read-ahead",
Value: 5*1024,
Value: 5 * 1024,
Usage: "How much data in KB should be pre-loaded with every read by default (default: 5 MB)",
},

@@ -323,19 +341,19 @@ func NewApp() (app *cli.App) {

cli.IntFlag{
Name: "large-read-cutoff",
Value: 20*1024,
Value: 20 * 1024,
Usage: "Amount of linear read in KB after which the \"large\" readahead should be triggered (default: 20 MB)",
},

cli.IntFlag{
Name: "read-ahead-large",
Value: 100*1024,
Value: 100 * 1024,
Usage: "Larger readahead size in KB to be used when long linear reads are detected (default: 100 MB)",
},

cli.IntFlag{
Name: "read-ahead-parallel",
Value: 20*1024,
Value: 20 * 1024,
Usage: "Larger readahead will be triggered in parallel chunks of this size in KB (default: 20 MB)",
},

@@ -356,7 +374,7 @@ func NewApp() (app *cli.App) {
cli.StringFlag{
Name: "part-sizes",
Value: "5:1000,25:1000,125",
Usage: "Part sizes in MB. Total part count is always 10000 in S3."+
Usage: "Part sizes in MB. Total part count is always 10000 in S3." +
" Default is 1000 5 MB parts, then 1000 25 MB parts" +
" and then 125 MB for the rest of parts",
},
@@ -365,7 +383,7 @@ func NewApp() (app *cli.App) {
Name: "max-merge-copy",
Value: 0,
Usage: "If non-zero, allow to compose larger parts up to this number of megabytes" +
" in size from existing unchanged parts when doing server-side part copy."+
" in size from existing unchanged parts when doing server-side part copy." +
" Must be left at 0 for Yandex S3 (default: 0)",
},

@@ -473,7 +491,7 @@ func NewApp() (app *cli.App) {
Usage: "Mount an S3 bucket locally",
HideHelp: true,
Writer: os.Stderr,
Flags: append(append(append(append([]cli.Flag{
Flags: append(append(append(append([]cli.Flag{
cli.BoolFlag{
Name: "help, h",
Usage: "Print this help text and exit successfully.",
@@ -558,7 +576,7 @@ func parsePartSizes(s string) (result []PartSizeConfig) {
if pi < len(partSizes)-1 {
panic("Part count may be omitted only for the last interval")
}
count = 10000-totalCount
count = 10000 - totalCount
}
totalCount += count
if totalCount > 10000 {
@@ -571,7 +589,7 @@ func parsePartSizes(s string) (result []PartSizeConfig) {
panic("Maximum part size is 5 GB")
}
result = append(result, PartSizeConfig{
PartSize: size*1024*1024,
PartSize: size * 1024 * 1024,
PartCount: count,
})
}
@@ -588,55 +606,58 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {

flags := &FlagStorage{
// File system
MountOptions: make(map[string]string),
DirMode: os.FileMode(c.Int("dir-mode")),
FileMode: os.FileMode(c.Int("file-mode")),
Uid: uint32(c.Int("uid")),
Gid: uint32(c.Int("gid")),
MountOptions: make(map[string]string),
DirMode: os.FileMode(c.Int("dir-mode")),
FileMode: os.FileMode(c.Int("file-mode")),
Uid: uint32(c.Int("uid")),
Gid: uint32(c.Int("gid")),

// Tuning,
MemoryLimit: uint64(1024*1024*c.Int("memory-limit")),
GCInterval: uint64(1024*1024*c.Int("gc-interval")),
Cheap: c.Bool("cheap"),
ExplicitDir: c.Bool("no-implicit-dir"),
NoDirObject: c.Bool("no-dir-object"),
MaxFlushers: int64(c.Int("max-flushers")),
MaxParallelParts: c.Int("max-parallel-parts"),
MaxParallelCopy: c.Int("max-parallel-copy"),
StatCacheTTL: c.Duration("stat-cache-ttl"),
HTTPTimeout: c.Duration("http-timeout"),
RetryInterval: c.Duration("retry-interval"),
ReadAheadKB: uint64(c.Int("read-ahead")),
SmallReadCount: uint64(c.Int("small-read-count")),
SmallReadCutoffKB: uint64(c.Int("small-read-cutoff")),
ReadAheadSmallKB: uint64(c.Int("read-ahead-small")),
LargeReadCutoffKB: uint64(c.Int("large-read-cutoff")),
ReadAheadLargeKB: uint64(c.Int("read-ahead-large")),
ReadAheadParallelKB: uint64(c.Int("read-ahead-parallel")),
ReadMergeKB: uint64(c.Int("read-merge")),
SinglePartMB: uint64(singlePart),
MaxMergeCopyMB: uint64(c.Int("max-merge-copy")),
IgnoreFsync: c.Bool("ignore-fsync"),
SymlinkAttr: c.String("symlink-attr"),
CachePopularThreshold: int64(c.Int("cache-popular-threshold")),
CacheMaxHits: int64(c.Int("cache-max-hits")),
CacheAgeInterval: int64(c.Int("cache-age-interval")),
CacheAgeDecrement: int64(c.Int("cache-age-decrement")),
CacheToDiskHits: int64(c.Int("cache-to-disk-hits")),
CachePath: c.String("cache"),
MaxDiskCacheFD: int64(c.Int("max-disk-cache-fd")),
CacheFileMode: os.FileMode(c.Int("cache-file-mode")),
MemoryLimit: uint64(1024 * 1024 * c.Int("memory-limit")),
GCInterval: uint64(1024 * 1024 * c.Int("gc-interval")),
Cheap: c.Bool("cheap"),
ExplicitDir: c.Bool("no-implicit-dir"),
NoDirObject: c.Bool("no-dir-object"),
MaxFlushers: int64(c.Int("max-flushers")),
MaxParallelParts: c.Int("max-parallel-parts"),
MaxParallelCopy: c.Int("max-parallel-copy"),
StatCacheTTL: c.Duration("stat-cache-ttl"),
HTTPTimeout: c.Duration("http-timeout"),
RetryInterval: c.Duration("retry-interval"),
ReadAheadKB: uint64(c.Int("read-ahead")),
SmallReadCount: uint64(c.Int("small-read-count")),
SmallReadCutoffKB: uint64(c.Int("small-read-cutoff")),
ReadAheadSmallKB: uint64(c.Int("read-ahead-small")),
LargeReadCutoffKB: uint64(c.Int("large-read-cutoff")),
ReadAheadLargeKB: uint64(c.Int("read-ahead-large")),
ReadAheadParallelKB: uint64(c.Int("read-ahead-parallel")),
ReadMergeKB: uint64(c.Int("read-merge")),
SinglePartMB: uint64(singlePart),
MaxMergeCopyMB: uint64(c.Int("max-merge-copy")),
IgnoreFsync: c.Bool("ignore-fsync"),
SymlinkAttr: c.String("symlink-attr"),
CachePopularThreshold: int64(c.Int("cache-popular-threshold")),
CacheMaxHits: int64(c.Int("cache-max-hits")),
CacheAgeInterval: int64(c.Int("cache-age-interval")),
CacheAgeDecrement: int64(c.Int("cache-age-decrement")),
CacheToDiskHits: int64(c.Int("cache-to-disk-hits")),
CachePath: c.String("cache"),
MaxDiskCacheFD: int64(c.Int("max-disk-cache-fd")),
CacheFileMode: os.FileMode(c.Int("cache-file-mode")),

// Common Backend Config
Endpoint: c.String("endpoint"),
UseContentType: c.Bool("use-content-type"),
Provider: c.String("provider"),
Capacity: c.Uint64("capacity"),
DiskUsageInterval: c.Uint64("disk-usage-interval"),
Endpoint: c.String("endpoint"),
UseContentType: c.Bool("use-content-type"),

// Debugging,
DebugMain: c.Bool("debug"),
DebugFuse: c.Bool("debug_fuse"),
DebugS3: c.Bool("debug_s3"),
Foreground: c.Bool("f"),
LogFile: c.String("log-file"),
DebugMain: c.Bool("debug"),
DebugFuse: c.Bool("debug_fuse"),
DebugS3: c.Bool("debug_s3"),
Foreground: c.Bool("f"),
LogFile: c.String("log-file"),
}

flags.PartSizes = parsePartSizes(c.String("part-sizes"))
@@ -645,25 +666,25 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
if flags.Backend == nil {
flags.Backend = (&S3Config{}).Init()
config, _ := flags.Backend.(*S3Config)
config.Region = c.String("region")
config.RegionSet = c.IsSet("region")
config.Region = c.String("region")
config.RegionSet = c.IsSet("region")
config.RequesterPays = c.Bool("requester-pays")
config.StorageClass = c.String("storage-class")
config.Profile = c.String("profile")
config.SharedConfig = c.StringSlice("shared-config")
config.UseSSE = c.Bool("sse")
config.UseKMS = c.IsSet("sse-kms")
config.KMSKeyID = c.String("sse-kms")
config.SseC = c.String("sse-c")
config.ACL = c.String("acl")
config.Subdomain = c.Bool("subdomain")
config.NoChecksum = c.Bool("no-checksum")
config.UseIAM = c.Bool("iam")
config.IAMHeader = c.String("iam-header")
config.MultipartAge = c.Duration("multipart-age")
config.StorageClass = c.String("storage-class")
config.Profile = c.String("profile")
config.SharedConfig = c.StringSlice("shared-config")
config.UseSSE = c.Bool("sse")
config.UseKMS = c.IsSet("sse-kms")
config.KMSKeyID = c.String("sse-kms")
config.SseC = c.String("sse-c")
config.ACL = c.String("acl")
config.Subdomain = c.Bool("subdomain")
config.NoChecksum = c.Bool("no-checksum")
config.UseIAM = c.Bool("iam")
config.IAMHeader = c.String("iam-header")
config.MultipartAge = c.Duration("multipart-age")
listType := c.String("list-type")
config.ListV1Ext = listType == "ext-v1"
config.ListV2 = listType == "2"
config.ListV1Ext = listType == "ext-v1"
config.ListV2 = listType == "2"

config.MultipartCopyThreshold = uint64(c.Int("multipart-copy-threshold")) * 1024 * 1024

119 changes: 80 additions & 39 deletions internal/goofys.go
Original file line number Diff line number Diff line change
@@ -36,8 +36,9 @@ import (
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"

"github.com/sirupsen/logrus"
"net/http"

"github.com/sirupsen/logrus"
)

// goofys is a Filey System written in Go. All the backend data is
@@ -66,8 +67,8 @@ type Goofys struct {
// from per-inode locks). Make sure to see the notes on lock ordering above.
mu sync.RWMutex

flusherMu sync.Mutex
flusherCond *sync.Cond
flusherMu sync.Mutex
flusherCond *sync.Cond
flushPending int32

// The next inode ID to hand out. We assume that this will never overflow,
@@ -91,24 +92,25 @@ type Goofys struct {
// Inflight changes are tracked to skip them in parallel listings
// Required because we don't have guarantees about listing & change ordering
inflightListingId int
inflightListings map[int]map[string]bool
inflightChanges map[string]int
inflightListings map[int]map[string]bool
inflightChanges map[string]int

nextHandleID fuseops.HandleID
dirHandles map[fuseops.HandleID]*DirHandle

fileHandles map[fuseops.HandleID]*FileHandle

activeFlushers int64
flushRetrySet int32
memRecency uint64
flushRetrySet int32
memRecency uint64

forgotCnt uint32

zeroBuf []byte
lfru *LFRU
diskFdMu sync.Mutex
diskFdCond *sync.Cond
zeroBuf []byte
lfru *LFRU
diskUsage uint64
diskFdMu sync.Mutex
diskFdCond *sync.Cond
diskFdCount int64
}

@@ -130,6 +132,8 @@ func NewBackend(bucket string, flags *FlagStorage) (cloud StorageBackend, err er
} else if config, ok := flags.Backend.(*S3Config); ok {
if strings.HasSuffix(flags.Endpoint, "/storage.googleapis.com") {
cloud, err = NewGCS3(bucket, flags, config)
} else if flags.Provider == "minio" {
cloud, err = NewMinio(bucket, flags, config)
} else {
cloud, err = NewS3(bucket, flags, config)
}
@@ -189,12 +193,12 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage,
newBackend func(string, *FlagStorage) (StorageBackend, error)) *Goofys {
// Set up the basic struct.
fs := &Goofys{
bucket: bucket,
flags: flags,
umask: 0122,
lfru: NewLFRU(flags.CachePopularThreshold, flags.CacheMaxHits, flags.CacheAgeInterval, flags.CacheAgeDecrement),
zeroBuf: make([]byte, 1048576),
inflightChanges: make(map[string]int),
bucket: bucket,
flags: flags,
umask: 0122,
lfru: NewLFRU(flags.CachePopularThreshold, flags.CacheMaxHits, flags.CacheAgeInterval, flags.CacheAgeDecrement),
zeroBuf: make([]byte, 1048576),
inflightChanges: make(map[string]int),
inflightListings: make(map[int]map[string]bool),
}

@@ -222,6 +226,18 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage,
}
_, fs.gcs = cloud.Delegate().(*GCS3)

go func(fs *Goofys, cloud StorageBackend) {
for {
getBucketUsageOutput, err := cloud.GetBucketUsage(&GetBucketUsageInput{})
if err != nil {
fs.diskUsage = 0
} else {
fs.diskUsage = getBucketUsageOutput.Size
}
time.Sleep(time.Second * time.Duration(fs.flags.DiskUsageInterval))
}
}(fs, cloud)

randomObjectName := prefix + (RandStringBytesMaskImprSrc(32))
err = cloud.Init(randomObjectName)
if err != nil {
@@ -236,7 +252,7 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage,
Mtime: now,
}

fs.bufferPool = NewBufferPool(int64(flags.MemoryLimit), uint64(flags.GCInterval) << 20)
fs.bufferPool = NewBufferPool(int64(flags.MemoryLimit), uint64(flags.GCInterval)<<20)
fs.bufferPool.FreeSomeCleanBuffers = func(size int64) (int64, bool) {
return fs.FreeSomeCleanBuffers(size)
}
@@ -271,6 +287,10 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage,
return fs
}

func (fs *Goofys) checkWriteAvailable(size uint64) bool {
return fs.flags.Capacity > (fs.diskUsage + size)
}

// from https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-golang
func RandStringBytesMaskImprSrc(n int) string {
const letterBytes = "abcdefghijklmnopqrstuvwxyz0123456789"
@@ -352,12 +372,12 @@ func (fs *Goofys) FreeSomeCleanBuffers(size int64) (int64, bool) {
haveDirty := false
// Free at least 5 MB
if size < 5*1024*1024 {
size = 5*1024*1024
size = 5 * 1024 * 1024
}
skipRecent := atomic.LoadUint64(&fs.memRecency)
// Avoid evicting at least 1/4 of recent memory allocations
if skipRecent > fs.flags.MemoryLimit/4 {
skipRecent -= fs.flags.MemoryLimit/4
skipRecent -= fs.flags.MemoryLimit / 4
} else {
skipRecent = 0
}
@@ -438,12 +458,12 @@ func (fs *Goofys) FreeSomeCleanBuffers(size int64) (int64, bool) {
// A flushed buffer can be removed at a cost of finalizing multipart upload
// to read it back later. However it's likely not a problem if we're uploading
// a large file because we may never need to read it back.
prev := del-1
prev := del - 1
if prev < 0 {
prev = i-1
prev = i - 1
}
if prev >= 0 && inode.buffers[prev].state == BUF_FL_CLEARED &&
buf.offset == (inode.buffers[prev].offset + inode.buffers[prev].length) {
buf.offset == (inode.buffers[prev].offset+inode.buffers[prev].length) {
inode.buffers[prev].length += buf.length
if del == -1 {
del = i
@@ -458,13 +478,13 @@ func (fs *Goofys) FreeSomeCleanBuffers(size int64) (int64, bool) {
haveDirty = true
}
if del >= 0 {
inode.buffers = append(inode.buffers[0 : del], inode.buffers[i : ]...)
inode.buffers = append(inode.buffers[0:del], inode.buffers[i:]...)
i = del
del = -1
}
}
if del >= 0 {
inode.buffers = append(inode.buffers[0 : del], inode.buffers[i : ]...)
inode.buffers = append(inode.buffers[0:del], inode.buffers[i:]...)
del = -1
}
inode.mu.Unlock()
@@ -686,16 +706,20 @@ func (fs *Goofys) StatFS(
op *fuseops.StatFSOp) (err error) {

const BLOCK_SIZE = 4096
const TOTAL_SPACE = 1 * 1024 * 1024 * 1024 * 1024 * 1024 // 1PB
const TOTAL_BLOCKS = TOTAL_SPACE / BLOCK_SIZE
const INODES = 1 * 1000 * 1000 * 1000 // 1 billion
TOTAL_BLOCKS := fs.flags.Capacity / BLOCK_SIZE
USAGE_BLOCKS := fs.diskUsage / BLOCK_SIZE

op.BlockSize = BLOCK_SIZE
op.Blocks = TOTAL_BLOCKS
op.BlocksFree = TOTAL_BLOCKS
op.BlocksAvailable = TOTAL_BLOCKS
op.IoSize = 1 * 1024 * 1024 // 1MB
op.Inodes = INODES
op.InodesFree = INODES
if USAGE_BLOCKS > TOTAL_BLOCKS {
op.BlocksFree = 0
} else {
op.BlocksFree = TOTAL_BLOCKS - USAGE_BLOCKS
}
op.BlocksAvailable = op.BlocksFree
op.IoSize = 1 * 1024 * 1024 // 1MB
op.Inodes = 1 * 1000 * 1000 * 1000 // 1 billion
op.InodesFree = 1 * 1000 * 1000 * 1000 // 1 billion
return
}

@@ -810,6 +834,9 @@ func (fs *Goofys) RemoveXattr(ctx context.Context,

func (fs *Goofys) SetXattr(ctx context.Context,
op *fuseops.SetXattrOp) (err error) {
if !fs.checkWriteAvailable(0) {
return syscall.ENOSPC
}
fs.mu.RLock()
inode := fs.getInodeOrDie(op.Inode)
fs.mu.RUnlock()
@@ -830,6 +857,9 @@ func (fs *Goofys) SetXattr(ctx context.Context,

func (fs *Goofys) CreateSymlink(ctx context.Context,
op *fuseops.CreateSymlinkOp) (err error) {
if !fs.checkWriteAvailable(0) {
return syscall.ENOSPC
}
fs.mu.RLock()
parent := fs.getInodeOrDie(op.Parent)
fs.mu.RUnlock()
@@ -1371,7 +1401,9 @@ func (fs *Goofys) ReleaseFileHandle(
func (fs *Goofys) CreateFile(
ctx context.Context,
op *fuseops.CreateFileOp) (err error) {

if !fs.checkWriteAvailable(0) {
return syscall.ENOSPC
}
fs.mu.RLock()
parent := fs.getInodeOrDie(op.Parent)
fs.mu.RUnlock()
@@ -1411,9 +1443,11 @@ func (fs *Goofys) CreateFile(
func (fs *Goofys) MkNode(
ctx context.Context,
op *fuseops.MkNodeOp) (err error) {

if (op.Mode & os.ModeType) != os.ModeDir &&
(op.Mode & os.ModeType) != 0 {
if !fs.checkWriteAvailable(0) {
return syscall.ENOSPC
}
if (op.Mode&os.ModeType) != os.ModeDir &&
(op.Mode&os.ModeType) != 0 {
// Special files are not supported yet
return syscall.ENOTSUP
}
@@ -1451,6 +1485,9 @@ func (fs *Goofys) MkNode(
func (fs *Goofys) MkDir(
ctx context.Context,
op *fuseops.MkDirOp) (err error) {
if !fs.checkWriteAvailable(0) {
return syscall.ENOSPC
}

fs.mu.RLock()
parent := fs.getInodeOrDie(op.Parent)
@@ -1498,7 +1535,9 @@ func (fs *Goofys) RmDir(
func (fs *Goofys) SetInodeAttributes(
ctx context.Context,
op *fuseops.SetInodeAttributesOp) (err error) {

if !fs.checkWriteAvailable(0) {
return syscall.ENOSPC
}
fs.mu.RLock()
inode := fs.getInodeOrDie(op.Inode)
fs.mu.RUnlock()
@@ -1538,7 +1577,9 @@ func (fs *Goofys) SetInodeAttributes(
func (fs *Goofys) WriteFile(
ctx context.Context,
op *fuseops.WriteFileOp) (err error) {

if !fs.checkWriteAvailable(uint64(op.Offset)) {
return syscall.ENOSPC
}
fs.mu.RLock()

fh, ok := fs.fileHandles[op.Handle]