Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion storage/sealer/fr32/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"math/bits"

pool "github.com/libp2p/go-buffer-pool"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
Expand All @@ -14,6 +15,8 @@ type unpadReader struct {

left uint64
work []byte

stash []byte
}

func BufSize(sz abi.PaddedPieceSize) int {
Expand All @@ -31,6 +34,10 @@ func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Re
return nil, xerrors.Errorf("bad piece size: %w", err)
}

if abi.PaddedPieceSize(len(buf)).Validate() != nil {
return nil, xerrors.Errorf("bad buffer size")
}

return &unpadReader{
src: src,

Expand All @@ -40,6 +47,39 @@ func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Re
}

func (r *unpadReader) Read(out []byte) (int, error) {
idealReadSize := abi.PaddedPieceSize(len(r.work)).Unpadded()

var err error
var rn int
if len(r.stash) == 0 && len(out) < int(idealReadSize) {
r.stash = pool.Get(int(idealReadSize))

rn, err = r.readInner(r.stash)
r.stash = r.stash[:rn]
}

if len(r.stash) > 0 {
n := copy(out, r.stash)
r.stash = r.stash[n:]

if len(r.stash) == 0 {
pool.Put(r.stash)
r.stash = nil
}

if err == io.EOF && rn > n {
err = nil
}

return n, err
}

return r.readInner(out)
}

// readInner reads from the underlying reader into the provided buffer.
// It requires that out[] is padded(power-of-two).unpadded()-sized, ideally quite large.
func (r *unpadReader) readInner(out []byte) (int, error) {
if r.left == 0 {
return 0, io.EOF
}
Expand All @@ -52,7 +92,8 @@ func (r *unpadReader) Read(out []byte) (int, error) {
return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err)
}

todo := abi.PaddedPieceSize(outTwoPow)
// Clamp `todo` to the length of the work buffer to prevent buffer overflows
todo := min(abi.PaddedPieceSize(outTwoPow), abi.PaddedPieceSize(len(r.work)))
if r.left < uint64(todo) {
todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left)))
}
Expand Down
162 changes: 162 additions & 0 deletions storage/sealer/fr32/readers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package fr32_test
import (
"bufio"
"bytes"
"crypto/rand"
"fmt"
"io"
"testing"

Expand Down Expand Up @@ -34,3 +36,163 @@ func TestUnpadReader(t *testing.T) {

require.Equal(t, raw, readered)
}

func TestUnpadReaderBufWithSmallWorkBuf(t *testing.T) {
ps := abi.PaddedPieceSize(64 << 20).Unpadded()

raw := bytes.Repeat([]byte{0x77}, int(ps))

padOut := make([]byte, ps.Padded())
fr32.Pad(raw, padOut)

buf := make([]byte, abi.PaddedPieceSize(uint64(128)))
r, err := fr32.NewUnpadReaderBuf(bytes.NewReader(padOut), ps.Padded(), buf)
if err != nil {
t.Fatal(err)
}

// using bufio reader to make sure reads are big enough for the padreader - it can't handle small reads right now
readered, err := io.ReadAll(bufio.NewReaderSize(r, 512))
if err != nil {
t.Fatal(err)
}

require.Equal(t, raw, readered)
}

func TestPadWriterUnpadReader(t *testing.T) {
testCases := []struct {
name string
unpadSize abi.UnpaddedPieceSize
readSizes []int
}{
{
name: "2K with aligned reads",
unpadSize: 2 * 127 * 8, // 2K unpadded
readSizes: []int{127, 127 * 4},
},
{
name: "Small piece, various read sizes",
unpadSize: 2 * 127 * 8, // 1016 bytes unpadded
readSizes: []int{1, 63, 127, 128, 255, 1016},
},
{
name: "Medium piece, various read sizes",
unpadSize: 127 * 1024, // 128KB unpadded
readSizes: []int{1, 127, 128, 255, 1024, 4096, 65536},
},
{
name: "Large piece, various read sizes",
unpadSize: 127 * 1024 * 1024, // 128MB unpadded
readSizes: []int{1, 127, 128, 255, 1024, 4096, 65536, 10 << 20, 11 << 20, 11<<20 + 134},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Generate random unpadded data
unpadded := make([]byte, tc.unpadSize)
n, err := rand.Read(unpadded)
require.NoError(t, err)
require.Equal(t, int(tc.unpadSize), n)

// Create a buffer to store padded data
paddedBuf := new(bytes.Buffer)

// Create and use PadWriter
padWriter := fr32.NewPadWriter(paddedBuf)
written, err := padWriter.Write(unpadded)
require.NoError(t, err)
require.Equal(t, int(tc.unpadSize), written)
require.NoError(t, padWriter.Close())

// Create UnpadReader
paddedSize := tc.unpadSize.Padded()
unpadReader, err := fr32.NewUnpadReader(bytes.NewReader(paddedBuf.Bytes()), paddedSize)
require.NoError(t, err)

offset := int64(0)
for _, readSize := range tc.readSizes {
t.Run(fmt.Sprintf("ReadSize_%d_Offset_%d", readSize, offset), func(t *testing.T) {
// Seek to offset
require.NoError(t, err)

// Read data
readBuf := make([]byte, readSize)
n, err := io.ReadFull(unpadReader, readBuf)
require.NoError(t, err)
require.Equal(t, readSize, n)

// Compare with original unpadded data
expected := unpadded[offset : offset+int64(len(readBuf))]
require.Equal(t, expected, readBuf)
offset += int64(n)
})
}
})
}
}

func TestUnpadReaderSmallReads(t *testing.T) {
unpadSize := abi.UnpaddedPieceSize(127 * 1024) // 128KB unpadded
unpadded := make([]byte, unpadSize)
n, err := rand.Read(unpadded)
require.NoError(t, err)
require.Equal(t, int(unpadSize), n)

paddedBuf := new(bytes.Buffer)
padWriter := fr32.NewPadWriter(paddedBuf)
_, err = padWriter.Write(unpadded)
require.NoError(t, err)
require.NoError(t, padWriter.Close())

paddedSize := unpadSize.Padded()
unpadReader, err := fr32.NewUnpadReader(bytes.NewReader(paddedBuf.Bytes()), paddedSize)
require.NoError(t, err)

result := make([]byte, 0, unpadSize)
smallBuf := make([]byte, 1) // Read one byte at a time

for {
n, err := unpadReader.Read(smallBuf)
if err == io.EOF {
break
}
require.NoError(t, err)
result = append(result, smallBuf[:n]...)
}

require.Equal(t, unpadded, result)
}

func TestUnpadReaderLargeReads(t *testing.T) {
unpadSize := abi.UnpaddedPieceSize(127 * 1024 * 1024) // 128MB unpadded
unpadded := make([]byte, unpadSize)
n, err := rand.Read(unpadded)
require.NoError(t, err)
require.Equal(t, int(unpadSize), n)

paddedBuf := new(bytes.Buffer)
padWriter := fr32.NewPadWriter(paddedBuf)
_, err = padWriter.Write(unpadded)
require.NoError(t, err)
require.NoError(t, padWriter.Close())

paddedSize := unpadSize.Padded()
unpadReader, err := fr32.NewUnpadReader(bytes.NewReader(paddedBuf.Bytes()), paddedSize)
require.NoError(t, err)

largeBuf := make([]byte, 10*1024*1024) // 10MB buffer
result := make([]byte, 0, unpadSize)

for {
n, err := unpadReader.Read(largeBuf)
if err == io.EOF {
break
}
require.NoError(t, err)
result = append(result, largeBuf[:n]...)
}

require.Equal(t, unpadded, result)
}
Loading