From ded6f60c2793f9c13855ce7c5483918aa0ec6930 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 22 Jan 2026 14:34:46 +0000 Subject: [PATCH 1/2] feat: add flatfs key adapter store --- pkg/config/app/storage.go | 1 + pkg/config/repo.go | 2 + pkg/fx/store/filesystem/provider.go | 8 ++ pkg/store/objectstore/flatfs/adapter.go | 84 ++++++++++++++++++++ pkg/store/objectstore/flatfs/adapter_test.go | 49 ++++++++++++ 5 files changed, 144 insertions(+) create mode 100644 pkg/store/objectstore/flatfs/adapter.go create mode 100644 pkg/store/objectstore/flatfs/adapter_test.go diff --git a/pkg/config/app/storage.go b/pkg/config/app/storage.go index b74fd1f7..23439e45 100644 --- a/pkg/config/app/storage.go +++ b/pkg/config/app/storage.go @@ -89,6 +89,7 @@ type MinioConfig struct { Bucket string // bucket name Credentials Credentials // access credentials Insecure bool // set to true to disable SSL + FlatFSKeys bool // use FlatFS key adapter } // Credentials configures access credentials for Minio. diff --git a/pkg/config/repo.go b/pkg/config/repo.go index aede95de..224f13e6 100644 --- a/pkg/config/repo.go +++ b/pkg/config/repo.go @@ -18,6 +18,7 @@ type MinioConfig struct { Bucket string `mapstructure:"bucket" validate:"required" toml:"bucket"` Credentials Credentials `mapstructure:"credentials" toml:"credentials,omitempty"` Insecure bool `mapstructure:"insecure" toml:"insecure,omitempty"` + FlatFSKeys bool `mapstructure:"flatfs_keys" toml:"flatfs_keys,omitempty"` } // BlobStorageConfig is special configuration allowing blobs to be stored @@ -50,6 +51,7 @@ func (r RepoConfig) ToAppConfig() (app.StorageConfig, error) { Bucket: r.BlobStorage.Minio.Bucket, Credentials: app.Credentials(r.BlobStorage.Minio.Credentials), Insecure: r.BlobStorage.Minio.Insecure, + FlatFSKeys: r.BlobStorage.Minio.FlatFSKeys, } } diff --git a/pkg/fx/store/filesystem/provider.go b/pkg/fx/store/filesystem/provider.go index 4e543aaf..75140e1a 100644 --- a/pkg/fx/store/filesystem/provider.go +++ b/pkg/fx/store/filesystem/provider.go @@ -21,6 +21,7 @@ import ( "github.com/storacha/piri/pkg/store/claimstore" "github.com/storacha/piri/pkg/store/delegationstore" "github.com/storacha/piri/pkg/store/keystore" + "github.com/storacha/piri/pkg/store/objectstore" "github.com/storacha/piri/pkg/store/objectstore/flatfs" minio_store "github.com/storacha/piri/pkg/store/objectstore/minio" "github.com/storacha/piri/pkg/store/receiptstore" @@ -272,10 +273,17 @@ func NewPDPStore(cfg app.PDPStoreConfig, lc fx.Lifecycle) (blobstore.PDPStore, e if creds.AccessKeyID != "" && creds.SecretAccessKey != "" { options.Creds = credentials.NewStaticV4(creds.AccessKeyID, creds.SecretAccessKey, "") } + var objStore objectstore.Store objStore, err := minio_store.New(cfg.Minio.Endpoint, cfg.Minio.Bucket, options) if err != nil { return nil, fmt.Errorf("creating pdp object store: %w", err) } + if cfg.Minio.FlatFSKeys { + objStore, err = flatfs.NewFlatFSKeyAdapter(context.Background(), objStore, flatfs.NextToLast(2)) + if err != nil { + return nil, fmt.Errorf("creating flatfs key adapter for pdp object store: %w", err) + } + } return blobstore.NewObjectBlobstore(objStore), nil } diff --git a/pkg/store/objectstore/flatfs/adapter.go b/pkg/store/objectstore/flatfs/adapter.go new file mode 100644 index 00000000..405e8958 --- /dev/null +++ b/pkg/store/objectstore/flatfs/adapter.go @@ -0,0 +1,84 @@ +package flatfs + +import ( + "context" + "fmt" + "io" + "path/filepath" + "strings" + + "github.com/storacha/piri/pkg/store/objectstore" +) + +type FlatFSKeyAdapterStore struct { + store objectstore.Store + shardStr string + getDir ShardFunc +} + +var _ objectstore.Store = (*FlatFSKeyAdapterStore)(nil) + +func NewFlatFSKeyAdapter(ctx context.Context, store objectstore.Store, fun *ShardIdV1) (*FlatFSKeyAdapterStore, error) { + obj, err := store.Get(ctx, SHARDING_FN) + if err != nil { + if err != objectstore.ErrNotExist { + return nil, fmt.Errorf("getting sharding info: %w", err) + } + data := fun.String() + "\n" + err := store.Put(ctx, SHARDING_FN, uint64(len(data)), io.NopCloser(strings.NewReader(data))) + if err != nil { + return nil, fmt.Errorf("storing sharding info: %w", err) + } + } else { + body := obj.Body() + defer body.Close() + bytes, err := io.ReadAll(body) + if err != nil { + return nil, fmt.Errorf("reading sharding info: %w", err) + } + id, err := ParseShardFunc(string(bytes)) + if err != nil { + return nil, fmt.Errorf("parsing existing sharding info: %w", err) + } + if id.String() != fun.String() { + return nil, fmt.Errorf("sharding function mismatch: store has %q but adapter configured with %q", id.String(), fun.String()) + } + } + return &FlatFSKeyAdapterStore{ + store: store, + shardStr: fun.String(), + getDir: fun.Func(), + }, nil +} + +func (f *FlatFSKeyAdapterStore) Delete(ctx context.Context, key string) error { + // Can't exist in datastore. + if !keyIsValid(key) { + return nil + } + _, file := f.encode(key) + return f.store.Delete(ctx, file) +} + +func (f *FlatFSKeyAdapterStore) encode(key string) (dir, file string) { + dir = f.getDir(key) + file = filepath.Join(dir, key+extension) + return dir, file +} + +func (f *FlatFSKeyAdapterStore) Get(ctx context.Context, key string, opts ...objectstore.GetOption) (objectstore.Object, error) { + // Can't exist in datastore. + if !keyIsValid(key) { + return nil, objectstore.ErrNotExist + } + _, file := f.encode(key) + return f.store.Get(ctx, file, opts...) +} + +func (f *FlatFSKeyAdapterStore) Put(ctx context.Context, key string, size uint64, data io.Reader) error { + if !keyIsValid(key) { + return fmt.Errorf("when putting %q: %w", key, ErrInvalidKey) + } + _, file := f.encode(key) + return f.store.Put(ctx, file, size, data) +} diff --git a/pkg/store/objectstore/flatfs/adapter_test.go b/pkg/store/objectstore/flatfs/adapter_test.go new file mode 100644 index 00000000..c2f26d8a --- /dev/null +++ b/pkg/store/objectstore/flatfs/adapter_test.go @@ -0,0 +1,49 @@ +package flatfs_test + +import ( + "io" + "strings" + "testing" + + "github.com/storacha/piri/pkg/store/objectstore/flatfs" + "github.com/storacha/piri/pkg/store/objectstore/memory" + "github.com/stretchr/testify/require" +) + +func TestFlatFSKeyAdapterStore(t *testing.T) { + fun := flatfs.NextToLast(2) + memStore := memory.NewStore() + flatfsMemStore, err := flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, fun) + + key := "ciqjkmoqchcaod3rhy26uo57r2ktpxq4lnmhye6njse7l2rkhoetrjy" + value := "hello world" + + err = flatfsMemStore.Put(t.Context(), key, uint64(len(value)), strings.NewReader(value)) + require.NoError(t, err) + + // ensure SHARDING file was put + _, err = memStore.Get(t.Context(), flatfs.SHARDING_FN) + require.NoError(t, err) + + obj, err := flatfsMemStore.Get(t.Context(), key) + require.NoError(t, err) + body := obj.Body() + + b, err := io.ReadAll(body) + require.NoError(t, err) + body.Close() + require.Equal(t, value, string(b)) + + // check it was stored at the right path in the underlying store + obj, err = memStore.Get(t.Context(), "rj/"+key+".data") + require.NoError(t, err) + + // ensure can create from existing + _, err = flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, fun) + require.NoError(t, err) + + // ensure error when sharding function does not match + _, err = flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, flatfs.NextToLast(3)) + require.Error(t, err) + require.ErrorContains(t, err, "sharding function mismatch") +} From a14300e8b7080037063dc1ce332f9c47f78c11e3 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 22 Jan 2026 14:51:28 +0000 Subject: [PATCH 2/2] chore: appease linter --- pkg/store/objectstore/flatfs/adapter_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/store/objectstore/flatfs/adapter_test.go b/pkg/store/objectstore/flatfs/adapter_test.go index c2f26d8a..ed205cc0 100644 --- a/pkg/store/objectstore/flatfs/adapter_test.go +++ b/pkg/store/objectstore/flatfs/adapter_test.go @@ -14,6 +14,7 @@ func TestFlatFSKeyAdapterStore(t *testing.T) { fun := flatfs.NextToLast(2) memStore := memory.NewStore() flatfsMemStore, err := flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, fun) + require.NoError(t, err) key := "ciqjkmoqchcaod3rhy26uo57r2ktpxq4lnmhye6njse7l2rkhoetrjy" value := "hello world" @@ -35,7 +36,7 @@ func TestFlatFSKeyAdapterStore(t *testing.T) { require.Equal(t, value, string(b)) // check it was stored at the right path in the underlying store - obj, err = memStore.Get(t.Context(), "rj/"+key+".data") + _, err = memStore.Get(t.Context(), "rj/"+key+".data") require.NoError(t, err) // ensure can create from existing