Skip to content

Commit

Permalink
Replace KeyReaderAt by KeyStreamerAt (#7)
Browse files Browse the repository at this point in the history
Co-authored-by: Thomas Coquet <[email protected]>
  • Loading branch information
thomascoquet and Thomas Coquet authored Sep 16, 2021
1 parent 2ae1ca4 commit 2e02b06
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 114 deletions.
165 changes: 102 additions & 63 deletions adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package osio

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"sort"
"strconv"
Expand All @@ -33,44 +35,54 @@ import (

// KeyReaderAt is the interface that wraps the basic ReadAt method for the specified key.
//
// For performance reasons, as we expect the ReadAt method to perform network access and thus
// be relatively slow, the ReadAt method diverges somewhat from the standard io.ReaderAt interface:
// Deprecated: Use KeyStreamerAt instead.
type KeyReaderAt interface {
// ReadAt reads len(p) bytes from the resource identified by key into p
// starting at offset off. It returns the number of bytes read (0 <= n <= len(p)) and
// any error encountered.
//
// Deprecated: Use StreamAt instead.
ReadAt(key string, p []byte, off int64) (int, int64, error)
}

// KeyStreamerAt is the second interface a handler can implement.
//
// • ReadAt should return ENOENT in case of an error due to an inexistant file. This non-existant
// • StreamAt should return ENOENT in case of an error due to an inexistant file. This non-existant
// status is cached by the Adapter in order to prevent subsequent calls to the same key.
//
// • ReadAt should return the total size of the object when called with a 0 offset. This is required
// • StreamAt should return the total size of the object when called with a 0 offset. This is required
// in order to implement the io.Seeker interface, and to detect out of bounds accesses without incurring
// a network access. If you do not rely on this functionality, your implementation may return math.MaxInt64

type KeyReaderAt interface {
// ReadAt reads len(p) bytes from the resource identified by key into p
// starting at offset off. It returns the number of bytes read (0 <= n <= len(p)) and
// any error encountered.
type KeyStreamerAt interface {
// StreamAt returns a io.ReadCloser on a section from the resource identified by key
// starting at offset off. It returns any error encountered.
//
// If the read fails because the object does not exist, ReadAt must return syscall.ENOENT
// If the stream fails because the object does not exist, StreamAt must return syscall.ENOENT
// (or a wrapped error of syscall.ENOENT)
//
// When ReadAt returns n < len(p), it returns a non-nil error explaining why more bytes
// were not returned. In this respect, ReadAt is stricter than io.Read.
// The reader returned by StreamAt must follow the standard io.ReadCloser convention with respect
// to error handling.
//
// Even if ReadAt returns n < len(p), it may use all of p as scratch space during the call.
// If some data is available but not len(p) bytes, ReadAt blocks until either all the data
// is available or an error occurs. In this respect ReadAt is different from io.Read.
// Clients of StreamAt can execute parallel StreamAt calls on the same input source.
//
// If the n = len(p) bytes returned by ReadAt are at the end of the input source, ReadAt
// may return either err == io.EOF or err == nil.
//
// If ReadAt is reading from an input source with a seek offset, ReadAt should not affect
// nor be affected by the underlying seek offset.
//
// Clients of ReadAt can execute parallel ReadAt calls on the same input source.
//
// If called with off==0, ReadAt must also return the total object size in its second
// If called with off==0, StreamAt must also return the total object size in its second
// return value
//
// Implementations must not retain p.
ReadAt(key string, p []byte, off int64) (int, int64, error)
// The caller of StreamAt is responsible for closing the stream.
StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error)
}

type readerAtToStreamerAtWrapper struct {
KeyReaderAt
}

func (w readerAtToStreamerAtWrapper) StreamAt(key string, off int64, tot int64) (io.ReadCloser, int64, error) {
p := make([]byte, tot)
n, size, err := w.ReadAt(key, p, off)
if err == io.EOF {
err = nil
}
return ioutil.NopCloser(bytes.NewReader(p[:n])), size, err
}

// BlockCacher is the interface that wraps block caching functionality
Expand Down Expand Up @@ -99,29 +111,30 @@ type NamedOnceMutex interface {
Unlock(key interface{})
}

// Adapter caches fixed-sized chunks of a KeyReaderAt, and exposes a proxy KeyReaderAt
// that feeds from its internal cache, only falling back to the provided KeyReaderAt whenever
// Adapter caches fixed-sized chunks of a KeyStreamerAt, and exposes
// ReadAt(key string, buf []byte, offset int64) (int, error)
// that feeds from its internal cache, only falling back to the provided KeyStreamerAt whenever
// data could not be retrieved from its internal cache, while ensuring that concurrent requests
// only result in a single call to the source reader.
type Adapter struct {
blockSize int64
blmu NamedOnceMutex
numCachedBlocks int
cache BlockCacher
reader KeyReaderAt
keyStreamer KeyStreamerAt
splitRanges bool
sizeCache *lru.Cache
retries int
}

func (a *Adapter) srcReadAt(key string, buf []byte, off int64) (int, error) {
func (a *Adapter) srcStreamAt(key string, off int64, n int64) (io.ReadCloser, error) {
try := 1
delay := 100 * time.Millisecond
var n int
var r io.ReadCloser
var tot int64
var err error
for {
n, tot, err = a.reader.ReadAt(key, buf, off)
r, tot, err = a.keyStreamer.StreamAt(key, off, n)
if err != nil && try <= a.retries && errs.Temporary(err) {
try++
time.Sleep(delay)
Expand All @@ -135,13 +148,23 @@ func (a *Adapter) srcReadAt(key string, buf []byte, off int64) (int, error) {
if errors.Is(err, syscall.ENOENT) {
a.sizeCache.Add(key, int64(-1))
}
if errors.Is(err, io.EOF) {
a.sizeCache.Add(key, tot)
}
} else {
a.sizeCache.Add(key, tot)
}
}
return r, err
}

func (a *Adapter) srcReadAt(key string, p []byte, off int64) (int, error) {
r, err := a.srcStreamAt(key, off, int64(len(p)))
if err != nil {
return 0, err
}
defer r.Close()
n, err := io.ReadFull(r, p)
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
return n, err
}

Expand Down Expand Up @@ -318,27 +341,40 @@ func (b scao) adapterOpt(a *Adapter) error {

// SizeCache is an option that determines how many key sizes will be cached by
// the adapter. Having a size cache speeds up the opening of files by not requiring
// that a lookup to the KeyReaderAt for the object size.
// that a lookup to the KeyStreamerAt for the object size.
func SizeCache(numEntries int) interface {
AdapterOption
} {
return scao{numEntries}
}

// NewAdapter creates a caching adapter around the provided KeyReaderAt
//
// NewAdapter will only return an error if you do not provide plausible options
// (e.g. negative number of blocks or sizes, nil caches, etc...)
const (
DefaultBlockSize = 128 * 1024
DefaultNumCachedBlocks = 100
)

func NewAdapter(reader KeyReaderAt, opts ...AdapterOption) (*Adapter, error) {
// NewAdapter creates a caching adapter around the provided KeyReaderAt.
//
// NewAdapter will only return an error if you do not provide plausible options
// (e.g. negative number of blocks or sizes, nil caches, etc...)
//
// Deprecated: use NewStreamingAdapter instead.
func NewAdapter(keyReader KeyReaderAt, opts ...AdapterOption) (*Adapter, error) {
if sa, ok := keyReader.(KeyStreamerAt); ok {
return NewStreamingAdapter(sa, opts...)
}
return NewStreamingAdapter(readerAtToStreamerAtWrapper{keyReader}, opts...)
}

// NewStreamingAdapter creates a caching adapter around the provided KeyStreamerAt.
//
// NewStreamingAdapter will only return an error if you do not provide plausible options
// (e.g. negative number of blocks or sizes, nil caches, etc...)
func NewStreamingAdapter(keyStreamer KeyStreamerAt, opts ...AdapterOption) (*Adapter, error) {
bc := &Adapter{
blockSize: DefaultBlockSize,
numCachedBlocks: DefaultNumCachedBlocks,
reader: reader,
keyStreamer: keyStreamer,
splitRanges: false,
retries: 5,
}
Expand Down Expand Up @@ -367,13 +403,6 @@ type blockRange struct {
end int64
}

func min(n1, n2 int64) int64 {
if n1 > n2 {
return n2
}
return int64(n1)
}

func (a *Adapter) getRange(key string, rng blockRange) ([][]byte, error) {
blocks := make([][]byte, rng.end-rng.start+1)
toFetch := make([]bool, rng.end-rng.start+1)
Expand All @@ -385,25 +414,35 @@ func (a *Adapter) getRange(key string, rng blockRange) ([][]byte, error) {
}
}
if nToFetch == len(blocks) {
buf := make([]byte, (rng.end-rng.start+1)*a.blockSize)
n, err := a.srcReadAt(key, buf, rng.start*a.blockSize)
if err != nil && !errors.Is(err, io.EOF) {
r, err := a.srcStreamAt(key, rng.start*a.blockSize, (rng.end-rng.start+1)*a.blockSize)
if err != nil {
for i := rng.start; i <= rng.end; i++ {
blockID := a.blockKey(key, i)
a.blmu.Unlock(blockID)
}
return nil, err
}
left := int64(n)
for bid := int64(0); bid <= rng.end-rng.start && left > 0; bid++ {
ll := min(left, a.blockSize)
blocks[bid] = make([]byte, ll)
copy(blocks[bid], buf[bid*a.blockSize:bid*a.blockSize+ll])
left -= ll
a.cache.Add(key, uint(rng.start+bid), blocks[bid])
}
for i := rng.start; i <= rng.end; i++ {
blockID := a.blockKey(key, i)
defer r.Close()
for bid := int64(0); bid <= rng.end-rng.start; bid++ {
blockID := a.blockKey(key, bid+rng.start)
buf := make([]byte, a.blockSize)
n, err := io.ReadFull(r, buf)
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
if err == nil || err == io.EOF {
blocks[bid] = buf[:n]
a.cache.Add(key, uint(rng.start+bid), blocks[bid])
}
if err != nil {
for i := rng.start + bid; i <= rng.end; i++ {
a.blmu.Unlock(a.blockKey(key, i))
}
if err == io.EOF {
break
}
return nil, err
}
a.blmu.Unlock(blockID)
}
return blocks, nil
Expand Down Expand Up @@ -609,7 +648,7 @@ func (a *Adapter) getBlock(key string, id int64) ([]byte, error) {
blockID := a.blockKey(key, id)
if a.blmu.Lock(blockID) {
buf := make([]byte, a.blockSize)
n, err := a.srcReadAt(key, buf, int64(id)*int64(a.blockSize))
n, err := a.srcReadAt(key, buf, int64(id)*a.blockSize)
if err != nil && !errors.Is(err, io.EOF) {
a.blmu.Unlock(blockID)
return nil, err
Expand Down
24 changes: 11 additions & 13 deletions gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,29 @@ func GCSHandle(ctx context.Context, opts ...GCSOption) (*GCSHandler, error) {
return handler, nil
}

func (gcs *GCSHandler) ReadAt(key string, p []byte, off int64) (int, int64, error) {
func (gcs *GCSHandler) StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error) {
bucket, object := osuriparse("gs", key)
if len(bucket) == 0 || len(object) == 0 {
return 0, 0, fmt.Errorf("invalid key")
return nil, 0, fmt.Errorf("invalid key")
}
gbucket := gcs.client.Bucket(bucket)
if gcs.billingProjectID != "" {
gbucket = gbucket.UserProject(gcs.billingProjectID)
}
r, err := gbucket.Object(object).NewRangeReader(gcs.ctx, off, int64(len(p)))
//fmt.Printf("read %s [%d-%d]\n", key, off, off+int64(len(p)))
r, err := gbucket.Object(object).NewRangeReader(gcs.ctx, off, n)
if err != nil {
var gerr *googleapi.Error
if off > 0 && errors.As(err, &gerr) && gerr.Code == 416 {
return 0, 0, io.EOF
return nil, 0, io.EOF
}
if errors.Is(err, storage.ErrObjectNotExist) || errors.Is(err, storage.ErrBucketNotExist) {
return 0, -1, syscall.ENOENT
return nil, -1, syscall.ENOENT
}
return 0, 0, fmt.Errorf("new reader for gs://%s/%s: %w", bucket, object, err)
}
defer r.Close()
n, err := io.ReadFull(r, p)
if err == io.ErrUnexpectedEOF {
err = io.EOF
return nil, 0, fmt.Errorf("new reader for gs://%s/%s: %w", bucket, object, err)
}
return n, r.Attrs.Size, err
return r, r.Attrs.Size, err
}

func (gcs *GCSHandler) ReadAt(key string, p []byte, off int64) (int, int64, error) {
panic("deprecated (kept for retrocompatibility)")
}
35 changes: 14 additions & 21 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,60 +71,53 @@ func HTTPHandle(ctx context.Context, opts ...HTTPOption) (*HTTPHandler, error) {
return handler, nil
}

func handleResponse(r *http.Response) (int, int64, error) {
func handleResponse(r *http.Response) (io.ReadCloser, int64, error) {
if r.StatusCode == 404 {
return 0, -1, syscall.ENOENT
return nil, -1, syscall.ENOENT
}
if r.StatusCode == 416 {
return 0, 0, io.EOF
return nil, 0, io.EOF
}
return 0, 0, fmt.Errorf("new reader for %s: status code %d", r.Request.URL.String(), r.StatusCode)
return nil, 0, fmt.Errorf("new reader for %s: status code %d", r.Request.URL.String(), r.StatusCode)
}

func (h *HTTPHandler) ReadAt(url string, p []byte, off int64) (int, int64, error) {
func (h *HTTPHandler) StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error) {
// HEAD request to get object size as it is not returned in range requests
var size int64
if off == 0 {
req, _ := http.NewRequest("HEAD", url, nil)
req, _ := http.NewRequest("HEAD", key, nil)
req = req.WithContext(h.ctx)
for _, mw := range h.requestMiddlewares {
mw(req)
}

r, err := h.client.Do(req)
if err != nil {
return 0, 0, fmt.Errorf("new reader for %s: %w", url, err)
return nil, 0, fmt.Errorf("new reader for %s: %w", key, err)
}
defer r.Body.Close()

if r.StatusCode != 200 {
return handleResponse(r)
}

size = r.ContentLength
}

// GET request to fetch range
req, _ := http.NewRequest("GET", url, nil)
req, _ := http.NewRequest("GET", key, nil)
req = req.WithContext(h.ctx)
for _, mw := range h.requestMiddlewares {
mw(req)
}
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1))

req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", off, off+n-1))
r, err := h.client.Do(req)
if err != nil {
return 0, 0, fmt.Errorf("new reader for %s: %w", url, err)
return nil, 0, fmt.Errorf("new reader for %s: %w", key, err)
}
defer r.Body.Close()

if r.StatusCode != 200 && r.StatusCode != 206 {
return handleResponse(r)
}
return r.Body, size, err
}

n, err := io.ReadFull(r.Body, p)
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
return n, size, err
func (h *HTTPHandler) ReadAt(key string, p []byte, off int64) (int, int64, error) {
panic("deprecated (kept for retrocompatibility)")
}
Loading

0 comments on commit 2e02b06

Please sign in to comment.