diff --git a/pkg/config/app/storage.go b/pkg/config/app/storage.go index b74fd1f7..23439e45 100644 --- a/pkg/config/app/storage.go +++ b/pkg/config/app/storage.go @@ -89,6 +89,7 @@ type MinioConfig struct { Bucket string // bucket name Credentials Credentials // access credentials Insecure bool // set to true to disable SSL + FlatFSKeys bool // use FlatFS key adapter } // Credentials configures access credentials for Minio. diff --git a/pkg/config/repo.go b/pkg/config/repo.go index aede95de..224f13e6 100644 --- a/pkg/config/repo.go +++ b/pkg/config/repo.go @@ -18,6 +18,7 @@ type MinioConfig struct { Bucket string `mapstructure:"bucket" validate:"required" toml:"bucket"` Credentials Credentials `mapstructure:"credentials" toml:"credentials,omitempty"` Insecure bool `mapstructure:"insecure" toml:"insecure,omitempty"` + FlatFSKeys bool `mapstructure:"flatfs_keys" toml:"flatfs_keys,omitempty"` } // BlobStorageConfig is special configuration allowing blobs to be stored @@ -50,6 +51,7 @@ func (r RepoConfig) ToAppConfig() (app.StorageConfig, error) { Bucket: r.BlobStorage.Minio.Bucket, Credentials: app.Credentials(r.BlobStorage.Minio.Credentials), Insecure: r.BlobStorage.Minio.Insecure, + FlatFSKeys: r.BlobStorage.Minio.FlatFSKeys, } } diff --git a/pkg/fx/store/filesystem/provider.go b/pkg/fx/store/filesystem/provider.go index 4e543aaf..75140e1a 100644 --- a/pkg/fx/store/filesystem/provider.go +++ b/pkg/fx/store/filesystem/provider.go @@ -21,6 +21,7 @@ import ( "github.com/storacha/piri/pkg/store/claimstore" "github.com/storacha/piri/pkg/store/delegationstore" "github.com/storacha/piri/pkg/store/keystore" + "github.com/storacha/piri/pkg/store/objectstore" "github.com/storacha/piri/pkg/store/objectstore/flatfs" minio_store "github.com/storacha/piri/pkg/store/objectstore/minio" "github.com/storacha/piri/pkg/store/receiptstore" @@ -272,10 +273,17 @@ func NewPDPStore(cfg app.PDPStoreConfig, lc fx.Lifecycle) (blobstore.PDPStore, e if creds.AccessKeyID != "" && creds.SecretAccessKey != "" { options.Creds = credentials.NewStaticV4(creds.AccessKeyID, creds.SecretAccessKey, "") } + var objStore objectstore.Store objStore, err := minio_store.New(cfg.Minio.Endpoint, cfg.Minio.Bucket, options) if err != nil { return nil, fmt.Errorf("creating pdp object store: %w", err) } + if cfg.Minio.FlatFSKeys { + objStore, err = flatfs.NewFlatFSKeyAdapter(context.Background(), objStore, flatfs.NextToLast(2)) + if err != nil { + return nil, fmt.Errorf("creating flatfs key adapter for pdp object store: %w", err) + } + } return blobstore.NewObjectBlobstore(objStore), nil } diff --git a/pkg/store/objectstore/flatfs/adapter.go b/pkg/store/objectstore/flatfs/adapter.go new file mode 100644 index 00000000..405e8958 --- /dev/null +++ b/pkg/store/objectstore/flatfs/adapter.go @@ -0,0 +1,84 @@ +package flatfs + +import ( + "context" + "fmt" + "io" + "path/filepath" + "strings" + + "github.com/storacha/piri/pkg/store/objectstore" +) + +type FlatFSKeyAdapterStore struct { + store objectstore.Store + shardStr string + getDir ShardFunc +} + +var _ objectstore.Store = (*FlatFSKeyAdapterStore)(nil) + +func NewFlatFSKeyAdapter(ctx context.Context, store objectstore.Store, fun *ShardIdV1) (*FlatFSKeyAdapterStore, error) { + obj, err := store.Get(ctx, SHARDING_FN) + if err != nil { + if err != objectstore.ErrNotExist { + return nil, fmt.Errorf("getting sharding info: %w", err) + } + data := fun.String() + "\n" + err := store.Put(ctx, SHARDING_FN, uint64(len(data)), io.NopCloser(strings.NewReader(data))) + if err != nil { + return nil, fmt.Errorf("storing sharding info: %w", err) + } + } else { + body := obj.Body() + defer body.Close() + bytes, err := io.ReadAll(body) + if err != nil { + return nil, fmt.Errorf("reading sharding info: %w", err) + } + id, err := ParseShardFunc(string(bytes)) + if err != nil { + return nil, fmt.Errorf("parsing existing sharding info: %w", err) + } + if id.String() != fun.String() { + return nil, fmt.Errorf("sharding function mismatch: store has %q but adapter configured with %q", id.String(), fun.String()) + } + } + return &FlatFSKeyAdapterStore{ + store: store, + shardStr: fun.String(), + getDir: fun.Func(), + }, nil +} + +func (f *FlatFSKeyAdapterStore) Delete(ctx context.Context, key string) error { + // Can't exist in datastore. + if !keyIsValid(key) { + return nil + } + _, file := f.encode(key) + return f.store.Delete(ctx, file) +} + +func (f *FlatFSKeyAdapterStore) encode(key string) (dir, file string) { + dir = f.getDir(key) + file = filepath.Join(dir, key+extension) + return dir, file +} + +func (f *FlatFSKeyAdapterStore) Get(ctx context.Context, key string, opts ...objectstore.GetOption) (objectstore.Object, error) { + // Can't exist in datastore. + if !keyIsValid(key) { + return nil, objectstore.ErrNotExist + } + _, file := f.encode(key) + return f.store.Get(ctx, file, opts...) +} + +func (f *FlatFSKeyAdapterStore) Put(ctx context.Context, key string, size uint64, data io.Reader) error { + if !keyIsValid(key) { + return fmt.Errorf("when putting %q: %w", key, ErrInvalidKey) + } + _, file := f.encode(key) + return f.store.Put(ctx, file, size, data) +} diff --git a/pkg/store/objectstore/flatfs/adapter_test.go b/pkg/store/objectstore/flatfs/adapter_test.go new file mode 100644 index 00000000..ed205cc0 --- /dev/null +++ b/pkg/store/objectstore/flatfs/adapter_test.go @@ -0,0 +1,50 @@ +package flatfs_test + +import ( + "io" + "strings" + "testing" + + "github.com/storacha/piri/pkg/store/objectstore/flatfs" + "github.com/storacha/piri/pkg/store/objectstore/memory" + "github.com/stretchr/testify/require" +) + +func TestFlatFSKeyAdapterStore(t *testing.T) { + fun := flatfs.NextToLast(2) + memStore := memory.NewStore() + flatfsMemStore, err := flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, fun) + require.NoError(t, err) + + key := "ciqjkmoqchcaod3rhy26uo57r2ktpxq4lnmhye6njse7l2rkhoetrjy" + value := "hello world" + + err = flatfsMemStore.Put(t.Context(), key, uint64(len(value)), strings.NewReader(value)) + require.NoError(t, err) + + // ensure SHARDING file was put + _, err = memStore.Get(t.Context(), flatfs.SHARDING_FN) + require.NoError(t, err) + + obj, err := flatfsMemStore.Get(t.Context(), key) + require.NoError(t, err) + body := obj.Body() + + b, err := io.ReadAll(body) + require.NoError(t, err) + body.Close() + require.Equal(t, value, string(b)) + + // check it was stored at the right path in the underlying store + _, err = memStore.Get(t.Context(), "rj/"+key+".data") + require.NoError(t, err) + + // ensure can create from existing + _, err = flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, fun) + require.NoError(t, err) + + // ensure error when sharding function does not match + _, err = flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, flatfs.NextToLast(3)) + require.Error(t, err) + require.ErrorContains(t, err, "sharding function mismatch") +}