Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyStreamerAt #7

Merged
merged 6 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 102 additions & 63 deletions adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package osio

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"sort"
"strconv"
Expand All @@ -33,44 +35,54 @@ import (

// KeyReaderAt is the interface that wraps the basic ReadAt method for the specified key.
//
// For performance reasons, as we expect the ReadAt method to perform network access and thus
// be relatively slow, the ReadAt method diverges somewhat from the standard io.ReaderAt interface:
// Deprecated: Use KeyStreamerAt instead.
type KeyReaderAt interface {
// ReadAt reads len(p) bytes from the resource identified by key into p
// starting at offset off. It returns the number of bytes read (0 <= n <= len(p)) and
// any error encountered.
//
// Deprecated: Use StreamAt instead.
ReadAt(key string, p []byte, off int64) (int, int64, error)
}

// KeyStreamerAt is the second interface a handler can implement.
//
// • ReadAt should return ENOENT in case of an error due to an inexistant file. This non-existant
// • StreamAt should return ENOENT in case of an error due to an inexistant file. This non-existant
// status is cached by the Adapter in order to prevent subsequent calls to the same key.
//
// • ReadAt should return the total size of the object when called with a 0 offset. This is required
// • StreamAt should return the total size of the object when called with a 0 offset. This is required
// in order to implement the io.Seeker interface, and to detect out of bounds accesses without incurring
// a network access. If you do not rely on this functionality, your implementation may return math.MaxInt64

type KeyReaderAt interface {
// ReadAt reads len(p) bytes from the resource identified by key into p
// starting at offset off. It returns the number of bytes read (0 <= n <= len(p)) and
// any error encountered.
type KeyStreamerAt interface {
// StreamAt returns a io.ReadCloser on a section from the resource identified by key
// starting at offset off. It returns any error encountered.
//
// If the read fails because the object does not exist, ReadAt must return syscall.ENOENT
// If the stream fails because the object does not exist, StreamAt must return syscall.ENOENT
// (or a wrapped error of syscall.ENOENT)
//
// When ReadAt returns n < len(p), it returns a non-nil error explaining why more bytes
// were not returned. In this respect, ReadAt is stricter than io.Read.
// The reader returned by StreamAt must follow the standard io.ReadCloser convention with respect
// to error handling.
//
// Even if ReadAt returns n < len(p), it may use all of p as scratch space during the call.
// If some data is available but not len(p) bytes, ReadAt blocks until either all the data
// is available or an error occurs. In this respect ReadAt is different from io.Read.
// Clients of StreamAt can execute parallel StreamAt calls on the same input source.
//
// If the n = len(p) bytes returned by ReadAt are at the end of the input source, ReadAt
// may return either err == io.EOF or err == nil.
//
// If ReadAt is reading from an input source with a seek offset, ReadAt should not affect
// nor be affected by the underlying seek offset.
//
// Clients of ReadAt can execute parallel ReadAt calls on the same input source.
//
// If called with off==0, ReadAt must also return the total object size in its second
// If called with off==0, StreamAt must also return the total object size in its second
// return value
//
// Implementations must not retain p.
ReadAt(key string, p []byte, off int64) (int, int64, error)
// The caller of StreamAt is responsible for closing the stream.
StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error)
}

type readerAtToStreamerAtWrapper struct {
KeyReaderAt
}

func (w readerAtToStreamerAtWrapper) StreamAt(key string, off int64, tot int64) (io.ReadCloser, int64, error) {
p := make([]byte, tot)
n, size, err := w.ReadAt(key, p, off)
if err == io.EOF {
err = nil
}
return ioutil.NopCloser(bytes.NewReader(p[:n])), size, err
}

// BlockCacher is the interface that wraps block caching functionality
Expand Down Expand Up @@ -99,29 +111,30 @@ type NamedOnceMutex interface {
Unlock(key interface{})
}

// Adapter caches fixed-sized chunks of a KeyReaderAt, and exposes a proxy KeyReaderAt
// that feeds from its internal cache, only falling back to the provided KeyReaderAt whenever
// Adapter caches fixed-sized chunks of a KeyStreamerAt, and exposes
// ReadAt(key string, buf []byte, offset int64) (int, error)
// that feeds from its internal cache, only falling back to the provided KeyStreamerAt whenever
// data could not be retrieved from its internal cache, while ensuring that concurrent requests
// only result in a single call to the source reader.
type Adapter struct {
blockSize int64
blmu NamedOnceMutex
numCachedBlocks int
cache BlockCacher
reader KeyReaderAt
keyStreamer KeyStreamerAt
splitRanges bool
sizeCache *lru.Cache
retries int
}

func (a *Adapter) srcReadAt(key string, buf []byte, off int64) (int, error) {
func (a *Adapter) srcStreamAt(key string, off int64, n int64) (io.ReadCloser, error) {
try := 1
delay := 100 * time.Millisecond
var n int
var r io.ReadCloser
var tot int64
var err error
for {
n, tot, err = a.reader.ReadAt(key, buf, off)
r, tot, err = a.keyStreamer.StreamAt(key, off, n)
if err != nil && try <= a.retries && errs.Temporary(err) {
try++
time.Sleep(delay)
Expand All @@ -135,13 +148,23 @@ func (a *Adapter) srcReadAt(key string, buf []byte, off int64) (int, error) {
if errors.Is(err, syscall.ENOENT) {
a.sizeCache.Add(key, int64(-1))
}
if errors.Is(err, io.EOF) {
a.sizeCache.Add(key, tot)
}
} else {
a.sizeCache.Add(key, tot)
}
}
return r, err
}

func (a *Adapter) srcReadAt(key string, p []byte, off int64) (int, error) {
r, err := a.srcStreamAt(key, off, int64(len(p)))
if err != nil {
return 0, err
}
defer r.Close()
n, err := io.ReadFull(r, p)
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
return n, err
}

Expand Down Expand Up @@ -318,27 +341,40 @@ func (b scao) adapterOpt(a *Adapter) error {

// SizeCache is an option that determines how many key sizes will be cached by
// the adapter. Having a size cache speeds up the opening of files by not requiring
// that a lookup to the KeyReaderAt for the object size.
// that a lookup to the KeyStreamerAt for the object size.
func SizeCache(numEntries int) interface {
AdapterOption
} {
return scao{numEntries}
}

// NewAdapter creates a caching adapter around the provided KeyReaderAt
//
// NewAdapter will only return an error if you do not provide plausible options
// (e.g. negative number of blocks or sizes, nil caches, etc...)
const (
DefaultBlockSize = 128 * 1024
DefaultNumCachedBlocks = 100
)

func NewAdapter(reader KeyReaderAt, opts ...AdapterOption) (*Adapter, error) {
// NewAdapter creates a caching adapter around the provided KeyReaderAt.
//
// NewAdapter will only return an error if you do not provide plausible options
// (e.g. negative number of blocks or sizes, nil caches, etc...)
//
// Deprecated: use NewStreamingAdapter instead.
func NewAdapter(keyReader KeyReaderAt, opts ...AdapterOption) (*Adapter, error) {
if sa, ok := keyReader.(KeyStreamerAt); ok {
return NewStreamingAdapter(sa, opts...)
}
return NewStreamingAdapter(readerAtToStreamerAtWrapper{keyReader}, opts...)
}

// NewStreamingAdapter creates a caching adapter around the provided KeyStreamerAt.
//
// NewStreamingAdapter will only return an error if you do not provide plausible options
// (e.g. negative number of blocks or sizes, nil caches, etc...)
func NewStreamingAdapter(keyStreamer KeyStreamerAt, opts ...AdapterOption) (*Adapter, error) {
bc := &Adapter{
blockSize: DefaultBlockSize,
numCachedBlocks: DefaultNumCachedBlocks,
reader: reader,
keyStreamer: keyStreamer,
splitRanges: false,
retries: 5,
}
Expand Down Expand Up @@ -367,13 +403,6 @@ type blockRange struct {
end int64
}

func min(n1, n2 int64) int64 {
if n1 > n2 {
return n2
}
return int64(n1)
}

func (a *Adapter) getRange(key string, rng blockRange) ([][]byte, error) {
blocks := make([][]byte, rng.end-rng.start+1)
toFetch := make([]bool, rng.end-rng.start+1)
Expand All @@ -385,25 +414,35 @@ func (a *Adapter) getRange(key string, rng blockRange) ([][]byte, error) {
}
}
if nToFetch == len(blocks) {
buf := make([]byte, (rng.end-rng.start+1)*a.blockSize)
n, err := a.srcReadAt(key, buf, rng.start*a.blockSize)
if err != nil && !errors.Is(err, io.EOF) {
r, err := a.srcStreamAt(key, rng.start*a.blockSize, (rng.end-rng.start+1)*a.blockSize)
if err != nil {
for i := rng.start; i <= rng.end; i++ {
blockID := a.blockKey(key, i)
a.blmu.Unlock(blockID)
}
return nil, err
}
left := int64(n)
for bid := int64(0); bid <= rng.end-rng.start && left > 0; bid++ {
ll := min(left, a.blockSize)
blocks[bid] = make([]byte, ll)
copy(blocks[bid], buf[bid*a.blockSize:bid*a.blockSize+ll])
left -= ll
a.cache.Add(key, uint(rng.start+bid), blocks[bid])
}
for i := rng.start; i <= rng.end; i++ {
blockID := a.blockKey(key, i)
defer r.Close()
for bid := int64(0); bid <= rng.end-rng.start; bid++ {
blockID := a.blockKey(key, bid+rng.start)
buf := make([]byte, a.blockSize)
n, err := io.ReadFull(r, buf)
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
if err == nil || err == io.EOF {
blocks[bid] = buf[:n]
a.cache.Add(key, uint(rng.start+bid), blocks[bid])
}
if err != nil {
for i := rng.start + bid; i <= rng.end; i++ {
a.blmu.Unlock(a.blockKey(key, i))
}
if err == io.EOF {
break
}
return nil, err
}
a.blmu.Unlock(blockID)
}
return blocks, nil
Expand Down Expand Up @@ -609,7 +648,7 @@ func (a *Adapter) getBlock(key string, id int64) ([]byte, error) {
blockID := a.blockKey(key, id)
if a.blmu.Lock(blockID) {
buf := make([]byte, a.blockSize)
n, err := a.srcReadAt(key, buf, int64(id)*int64(a.blockSize))
n, err := a.srcReadAt(key, buf, int64(id)*a.blockSize)
if err != nil && !errors.Is(err, io.EOF) {
a.blmu.Unlock(blockID)
return nil, err
Expand Down
24 changes: 11 additions & 13 deletions gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,29 @@ func GCSHandle(ctx context.Context, opts ...GCSOption) (*GCSHandler, error) {
return handler, nil
}

func (gcs *GCSHandler) ReadAt(key string, p []byte, off int64) (int, int64, error) {
func (gcs *GCSHandler) StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error) {
bucket, object := osuriparse("gs", key)
if len(bucket) == 0 || len(object) == 0 {
return 0, 0, fmt.Errorf("invalid key")
return nil, 0, fmt.Errorf("invalid key")
}
gbucket := gcs.client.Bucket(bucket)
if gcs.billingProjectID != "" {
gbucket = gbucket.UserProject(gcs.billingProjectID)
}
r, err := gbucket.Object(object).NewRangeReader(gcs.ctx, off, int64(len(p)))
//fmt.Printf("read %s [%d-%d]\n", key, off, off+int64(len(p)))
r, err := gbucket.Object(object).NewRangeReader(gcs.ctx, off, n)
if err != nil {
var gerr *googleapi.Error
if off > 0 && errors.As(err, &gerr) && gerr.Code == 416 {
return 0, 0, io.EOF
return nil, 0, io.EOF
}
if errors.Is(err, storage.ErrObjectNotExist) || errors.Is(err, storage.ErrBucketNotExist) {
return 0, -1, syscall.ENOENT
return nil, -1, syscall.ENOENT
}
return 0, 0, fmt.Errorf("new reader for gs://%s/%s: %w", bucket, object, err)
}
defer r.Close()
n, err := io.ReadFull(r, p)
if err == io.ErrUnexpectedEOF {
err = io.EOF
return nil, 0, fmt.Errorf("new reader for gs://%s/%s: %w", bucket, object, err)
}
return n, r.Attrs.Size, err
return r, r.Attrs.Size, err
}

func (gcs *GCSHandler) ReadAt(key string, p []byte, off int64) (int, int64, error) {
panic("deprecated (kept for retrocompatibility)")
}
35 changes: 14 additions & 21 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,60 +71,53 @@ func HTTPHandle(ctx context.Context, opts ...HTTPOption) (*HTTPHandler, error) {
return handler, nil
}

func handleResponse(r *http.Response) (int, int64, error) {
func handleResponse(r *http.Response) (io.ReadCloser, int64, error) {
if r.StatusCode == 404 {
return 0, -1, syscall.ENOENT
return nil, -1, syscall.ENOENT
}
if r.StatusCode == 416 {
return 0, 0, io.EOF
return nil, 0, io.EOF
}
return 0, 0, fmt.Errorf("new reader for %s: status code %d", r.Request.URL.String(), r.StatusCode)
return nil, 0, fmt.Errorf("new reader for %s: status code %d", r.Request.URL.String(), r.StatusCode)
}

func (h *HTTPHandler) ReadAt(url string, p []byte, off int64) (int, int64, error) {
func (h *HTTPHandler) StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error) {
// HEAD request to get object size as it is not returned in range requests
var size int64
if off == 0 {
req, _ := http.NewRequest("HEAD", url, nil)
req, _ := http.NewRequest("HEAD", key, nil)
req = req.WithContext(h.ctx)
for _, mw := range h.requestMiddlewares {
mw(req)
}

r, err := h.client.Do(req)
if err != nil {
return 0, 0, fmt.Errorf("new reader for %s: %w", url, err)
return nil, 0, fmt.Errorf("new reader for %s: %w", key, err)
}
defer r.Body.Close()

if r.StatusCode != 200 {
return handleResponse(r)
}

size = r.ContentLength
}

// GET request to fetch range
req, _ := http.NewRequest("GET", url, nil)
req, _ := http.NewRequest("GET", key, nil)
req = req.WithContext(h.ctx)
for _, mw := range h.requestMiddlewares {
mw(req)
}
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1))

req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", off, off+n-1))
r, err := h.client.Do(req)
if err != nil {
return 0, 0, fmt.Errorf("new reader for %s: %w", url, err)
return nil, 0, fmt.Errorf("new reader for %s: %w", key, err)
}
defer r.Body.Close()

if r.StatusCode != 200 && r.StatusCode != 206 {
return handleResponse(r)
}
return r.Body, size, err
}

n, err := io.ReadFull(r.Body, p)
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
return n, size, err
func (h *HTTPHandler) ReadAt(key string, p []byte, off int64) (int, int64, error) {
panic("deprecated (kept for retrocompatibility)")
}
Loading