Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/config/app/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type MinioConfig struct {
Bucket string // bucket name
Credentials Credentials // access credentials
Insecure bool // set to true to disable SSL
FlatFSKeys bool // use FlatFS key adapter
}

// Credentials configures access credentials for Minio.
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type MinioConfig struct {
Bucket string `mapstructure:"bucket" validate:"required" toml:"bucket"`
Credentials Credentials `mapstructure:"credentials" toml:"credentials,omitempty"`
Insecure bool `mapstructure:"insecure" toml:"insecure,omitempty"`
FlatFSKeys bool `mapstructure:"flatfs_keys" toml:"flatfs_keys,omitempty"`
}

// BlobStorageConfig is special configuration allowing blobs to be stored
Expand Down Expand Up @@ -50,6 +51,7 @@ func (r RepoConfig) ToAppConfig() (app.StorageConfig, error) {
Bucket: r.BlobStorage.Minio.Bucket,
Credentials: app.Credentials(r.BlobStorage.Minio.Credentials),
Insecure: r.BlobStorage.Minio.Insecure,
FlatFSKeys: r.BlobStorage.Minio.FlatFSKeys,
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/fx/store/filesystem/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/storacha/piri/pkg/store/claimstore"
"github.com/storacha/piri/pkg/store/delegationstore"
"github.com/storacha/piri/pkg/store/keystore"
"github.com/storacha/piri/pkg/store/objectstore"
"github.com/storacha/piri/pkg/store/objectstore/flatfs"
minio_store "github.com/storacha/piri/pkg/store/objectstore/minio"
"github.com/storacha/piri/pkg/store/receiptstore"
Expand Down Expand Up @@ -272,10 +273,17 @@ func NewPDPStore(cfg app.PDPStoreConfig, lc fx.Lifecycle) (blobstore.PDPStore, e
if creds.AccessKeyID != "" && creds.SecretAccessKey != "" {
options.Creds = credentials.NewStaticV4(creds.AccessKeyID, creds.SecretAccessKey, "")
}
var objStore objectstore.Store
objStore, err := minio_store.New(cfg.Minio.Endpoint, cfg.Minio.Bucket, options)
if err != nil {
return nil, fmt.Errorf("creating pdp object store: %w", err)
}
if cfg.Minio.FlatFSKeys {
objStore, err = flatfs.NewFlatFSKeyAdapter(context.Background(), objStore, flatfs.NextToLast(2))
if err != nil {
return nil, fmt.Errorf("creating flatfs key adapter for pdp object store: %w", err)
}
}
return blobstore.NewObjectBlobstore(objStore), nil
}

Expand Down
84 changes: 84 additions & 0 deletions pkg/store/objectstore/flatfs/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package flatfs

import (
"context"
"fmt"
"io"
"path/filepath"
"strings"

"github.com/storacha/piri/pkg/store/objectstore"
)

type FlatFSKeyAdapterStore struct {
store objectstore.Store
shardStr string
getDir ShardFunc
}

var _ objectstore.Store = (*FlatFSKeyAdapterStore)(nil)

func NewFlatFSKeyAdapter(ctx context.Context, store objectstore.Store, fun *ShardIdV1) (*FlatFSKeyAdapterStore, error) {
obj, err := store.Get(ctx, SHARDING_FN)
if err != nil {
if err != objectstore.ErrNotExist {
return nil, fmt.Errorf("getting sharding info: %w", err)
}
data := fun.String() + "\n"
err := store.Put(ctx, SHARDING_FN, uint64(len(data)), io.NopCloser(strings.NewReader(data)))
if err != nil {
return nil, fmt.Errorf("storing sharding info: %w", err)
}
} else {
body := obj.Body()
defer body.Close()
bytes, err := io.ReadAll(body)
if err != nil {
return nil, fmt.Errorf("reading sharding info: %w", err)
}
id, err := ParseShardFunc(string(bytes))
if err != nil {
return nil, fmt.Errorf("parsing existing sharding info: %w", err)
}
if id.String() != fun.String() {
return nil, fmt.Errorf("sharding function mismatch: store has %q but adapter configured with %q", id.String(), fun.String())
}
}
return &FlatFSKeyAdapterStore{
store: store,
shardStr: fun.String(),
getDir: fun.Func(),
}, nil
}

func (f *FlatFSKeyAdapterStore) Delete(ctx context.Context, key string) error {
// Can't exist in datastore.
if !keyIsValid(key) {
return nil
}
_, file := f.encode(key)
return f.store.Delete(ctx, file)
}

func (f *FlatFSKeyAdapterStore) encode(key string) (dir, file string) {
dir = f.getDir(key)
file = filepath.Join(dir, key+extension)
return dir, file
}

func (f *FlatFSKeyAdapterStore) Get(ctx context.Context, key string, opts ...objectstore.GetOption) (objectstore.Object, error) {
// Can't exist in datastore.
if !keyIsValid(key) {
return nil, objectstore.ErrNotExist
}
_, file := f.encode(key)
return f.store.Get(ctx, file, opts...)
}

func (f *FlatFSKeyAdapterStore) Put(ctx context.Context, key string, size uint64, data io.Reader) error {
if !keyIsValid(key) {
return fmt.Errorf("when putting %q: %w", key, ErrInvalidKey)
}
_, file := f.encode(key)
return f.store.Put(ctx, file, size, data)
}
50 changes: 50 additions & 0 deletions pkg/store/objectstore/flatfs/adapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package flatfs_test

import (
"io"
"strings"
"testing"

"github.com/storacha/piri/pkg/store/objectstore/flatfs"
"github.com/storacha/piri/pkg/store/objectstore/memory"
"github.com/stretchr/testify/require"
)

func TestFlatFSKeyAdapterStore(t *testing.T) {
fun := flatfs.NextToLast(2)
memStore := memory.NewStore()
flatfsMemStore, err := flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, fun)
require.NoError(t, err)

key := "ciqjkmoqchcaod3rhy26uo57r2ktpxq4lnmhye6njse7l2rkhoetrjy"
value := "hello world"

err = flatfsMemStore.Put(t.Context(), key, uint64(len(value)), strings.NewReader(value))
require.NoError(t, err)

// ensure SHARDING file was put
_, err = memStore.Get(t.Context(), flatfs.SHARDING_FN)
require.NoError(t, err)

obj, err := flatfsMemStore.Get(t.Context(), key)
require.NoError(t, err)
body := obj.Body()

b, err := io.ReadAll(body)
require.NoError(t, err)
body.Close()
require.Equal(t, value, string(b))

// check it was stored at the right path in the underlying store
_, err = memStore.Get(t.Context(), "rj/"+key+".data")
require.NoError(t, err)

// ensure can create from existing
_, err = flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, fun)
require.NoError(t, err)

// ensure error when sharding function does not match
_, err = flatfs.NewFlatFSKeyAdapter(t.Context(), memStore, flatfs.NextToLast(3))
require.Error(t, err)
require.ErrorContains(t, err, "sharding function mismatch")
}
Loading