Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Macneale4/archive fetch #8843

Closed
wants to merge 14 commits into from
784 changes: 403 additions & 381 deletions go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion go/libraries/doltcore/remotesrv/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,12 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
var ranges []*remotesapi.RangeChunk
for h, r := range hashToRange {
hCpy := h
ranges = append(ranges, &remotesapi.RangeChunk{Hash: hCpy[:], Offset: r.Offset, Length: r.Length})
ranges = append(ranges, &remotesapi.RangeChunk{
Hash: hCpy[:],
Offset: r.Offset,
Length: r.Length,
DictionaryOffset: r.DictOffset,
DictionaryLength: r.DictLength})
}

url := rs.getDownloadUrl(md, prefix+"/"+loc)
Expand Down
11 changes: 9 additions & 2 deletions go/libraries/doltcore/remotesrv/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"strconv"
"strings"

"github.com/dolthub/dolt/go/store/nbs"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
Expand Down Expand Up @@ -94,12 +95,18 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
respWr.WriteHeader(http.StatusBadRequest)
return
}
_, ok := hash.MaybeParse(path[i+1:])

fileName := path[i+1:]
if strings.HasSuffix(fileName, nbs.ArchiveFileSuffix) {
fileName = fileName[:len(fileName)-len(nbs.ArchiveFileSuffix)]
}
_, ok := hash.MaybeParse(fileName)
if !ok {
logger.WithField("last_path_component", path[i+1:]).Warn("bad request with unparseable last path component")
logger.WithField("last_path_component", fileName).Warn("bad request with unparseable last path component")
respWr.WriteHeader(http.StatusBadRequest)
return
}

abs, err := fh.fs.Abs(path)
if err != nil {
logger.WithError(err).Error("could not get absolute path")
Expand Down
10 changes: 0 additions & 10 deletions go/libraries/doltcore/remotesrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package remotesrv
import (
"context"
"crypto/tls"
"errors"
"net"
"net/http"
"strings"
Expand All @@ -29,7 +28,6 @@ import (
"google.golang.org/grpc"

remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
)

Expand Down Expand Up @@ -80,14 +78,6 @@ func NewServer(args ServerArgs) (*Server, error) {
args.Logger = logrus.NewEntry(logrus.StandardLogger())
}

storageMetadata, err := env.GetMultiEnvStorageMetadata(args.FS)
if err != nil {
return nil, err
}
if storageMetadata.ArchiveFilesPresent() {
return nil, errors.New("archive files present. Please run `dolt archive --revert` before running the server.")
}

s := new(Server)
s.stopChan = make(chan struct{})

Expand Down
8 changes: 4 additions & 4 deletions go/libraries/doltcore/remotestorage/chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import (
// ChunkCache is an interface used for caching chunks
type ChunkCache interface {
// Put puts a slice of chunks into the cache.
Put(c []nbs.CompressedChunk) bool
Put(c []nbs.ToChunker) bool

// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
Get(h hash.HashSet) map[hash.Hash]nbs.CompressedChunk
Get(h hash.HashSet) map[hash.Hash]nbs.ToChunker

// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
Has(h hash.HashSet) (absent hash.HashSet)

// PutChunk puts a single chunk in the cache. true returns in the event that the chunk was cached successfully
// and false is returned if that chunk is already is the cache.
PutChunk(chunk nbs.CompressedChunk) bool
PutChunk(chunk nbs.ToChunker) bool

// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk
GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker
}
181 changes: 171 additions & 10 deletions go/libraries/doltcore/remotestorage/chunk_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/dolthub/gozstd"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

Expand All @@ -30,6 +33,8 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage/internal/reliable"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"

"github.com/hashicorp/golang-lru/v2"
)

// A remotestorage.ChunkFetcher is a pipelined chunk fetcher for fetching a
Expand All @@ -48,8 +53,14 @@ type ChunkFetcher struct {
eg *errgroup.Group
egCtx context.Context

// toGetCh is the channel used to request chunks. This will be initially given a root,
// and as refs are found, they will be added to the channel for workers to batch and request.
toGetCh chan hash.HashSet
resCh chan nbs.CompressedChunk

// resCh is the results channel for the fetcher. It is used both to return
// chunks themselves, and to indicate which chunks were requested but missing
// by having a Hash, but are empty.
resCh chan nbs.ToChunker

abortCh chan struct{}
stats StatsRecorder
Expand All @@ -62,14 +73,21 @@ const (
reliableCallDeliverRespTimeout = 15 * time.Second
)

var globalDictCache *dictionaryCache
var once sync.Once

func NewChunkFetcher(ctx context.Context, dcs *DoltChunkStore) *ChunkFetcher {
once.Do(func() {
globalDictCache = NewDictionaryCache(dcs.csClient)
})

eg, ctx := errgroup.WithContext(ctx)
ret := &ChunkFetcher{
eg: eg,
egCtx: ctx,

toGetCh: make(chan hash.HashSet),
resCh: make(chan nbs.CompressedChunk),
resCh: make(chan nbs.ToChunker),

abortCh: make(chan struct{}),
stats: StatsFactory(),
Expand Down Expand Up @@ -123,7 +141,7 @@ func (f *ChunkFetcher) CloseSend() error {
// by |Get|. Returns |io.EOF| after |CloseSend| is called and all requested
// chunks have been successfully received. Returns an error if this
// |ChunkFetcher| is terminally failed or if the supplied |ctx| is |Done|.
func (f *ChunkFetcher) Recv(ctx context.Context) (nbs.CompressedChunk, error) {
func (f *ChunkFetcher) Recv(ctx context.Context) (nbs.ToChunker, error) {
select {
case <-ctx.Done():
return nbs.CompressedChunk{}, context.Cause(ctx)
Expand Down Expand Up @@ -219,7 +237,7 @@ func fetcherHashSetToGetDlLocsReqsThread(ctx context.Context, reqCh chan hash.Ha
// delivered in |reqCh|, and they will be delivered in order.
//
// This function handles backoff and retries for the underlying streaming RPC.
func fetcherRPCDownloadLocsThread(ctx context.Context, reqCh chan *remotesapi.GetDownloadLocsRequest, resCh chan []*remotesapi.DownloadLoc, client remotesapi.ChunkStoreServiceClient, storeRepoToken func(string), missingChunkCh chan nbs.CompressedChunk, host string) error {
func fetcherRPCDownloadLocsThread(ctx context.Context, reqCh chan *remotesapi.GetDownloadLocsRequest, resCh chan []*remotesapi.DownloadLoc, client remotesapi.ChunkStoreServiceClient, storeRepoToken func(string), missingChunkCh chan nbs.ToChunker, host string) error {
stream, err := reliable.MakeCall[*remotesapi.GetDownloadLocsRequest, *remotesapi.GetDownloadLocsResponse](
ctx,
reliable.CallOptions[*remotesapi.GetDownloadLocsRequest, *remotesapi.GetDownloadLocsResponse]{
Expand Down Expand Up @@ -362,7 +380,7 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) {
d.refreshes[path] = refresh
}
for _, r := range gr.Ranges {
d.ranges.Insert(gr.Url, r.Hash, r.Offset, r.Length)
d.ranges.Insert(gr.Url, r.Hash[:], r.Offset, r.Length, r.DictionaryOffset, r.DictionaryLength)
}
}

Expand All @@ -371,9 +389,11 @@ func toGetRange(rs []*ranges.GetRange) *GetRange {
for _, r := range rs {
ret.Url = r.Url
ret.Ranges = append(ret.Ranges, &remotesapi.RangeChunk{
Hash: r.Hash,
Offset: r.Offset,
Length: r.Length,
Hash: r.Hash,
Offset: r.Offset,
Length: r.Length,
DictionaryOffset: r.DictionaryOffset,
DictionaryLength: r.DictionaryLength,
})
}
return ret
Expand Down Expand Up @@ -527,7 +547,7 @@ func (cc *ConcurrencyControl) Run(ctx context.Context, done <-chan struct{}, ss
}
}

func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, doneCh chan struct{}, chunkCh chan nbs.CompressedChunk, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, doneCh chan struct{}, chunkCh chan nbs.ToChunker, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
eg, ctx := errgroup.WithContext(ctx)
cc := &ConcurrencyControl{
MaxConcurrency: params.MaximumConcurrentDownloads,
Expand Down Expand Up @@ -559,7 +579,7 @@ func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, do
return nil
}

func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, doneCh <-chan struct{}, chunkCh chan nbs.CompressedChunk, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, doneCh <-chan struct{}, chunkCh chan nbs.ToChunker, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
respCh := make(chan fetchResp)
cancelCh := make(chan struct{})
for {
Expand Down Expand Up @@ -587,3 +607,144 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don
}
}
}

// dictionaryCache caches dictionaries for the chunks in an archive store. When we fetch from a database with an archive,
// we get back the path/offset/length of the dictionary for each chunk. These, by definition, are repeatedly used
// and we don't want to request the same dictionary multiple times.
//
// Currently (feb '25), archives generally have only a default dictionary, so this is kind of overkill. Mainly planning
// for the future when chunk grouping is the default and we could have thousands of dictionaries.
type dictionaryCache struct {
cache *lru.TwoQueueCache[DictionaryKey, *gozstd.DDict]
pending sync.Map
client remotesapi.ChunkStoreServiceClient
dlds downloads
}

// DictionaryKey is the a globaly unique identifier for an archive dictionary.
type DictionaryKey struct {
// This is the short url to the resource, not including the query parameters - which are provided by the
// locationRefresher.
path string
off uint64
len uint32
}

func NewDictionaryCache(client remotesapi.ChunkStoreServiceClient) *dictionaryCache {
c, err := lru.New2Q[DictionaryKey, *gozstd.DDict](1024)
if err != nil {
panic(err)
}

return &dictionaryCache{
cache: c,
client: client,
dlds: newDownloads(),
}
}

func (dc *dictionaryCache) get(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) {
path := rang.ResourcePath()
off := rang.Ranges[idx].DictionaryOffset
ln := rang.Ranges[idx].DictionaryLength

key := DictionaryKey{path, off, ln}
if dict, ok := dc.cache.Get(key); ok {
return dict, nil
}

// Check for an in-flight request. Default dictionary will be requested many times, so we want to avoid
// making multiple requests for the same resource.
if ch, loaded := dc.pending.LoadOrStore(key, make(chan struct{})); loaded {
// There's an ongoing fetch, wait for its completion
<-ch.(chan struct{})
if dict, ok := dc.cache.Get(key); ok {
return dict, nil
}
return nil, errors.New("failed to fetch dictionary due to in-flight request")
}
// When update is done, regardless of success or failure, we need to unblock anyone waiting.
defer func() {
if ch, found := dc.pending.LoadAndDelete(key); found {
close(ch.(chan *gozstd.DDict))
}
}()

// Fetch the dictionary
ddict, err := dc.fetchDictionary(rang, idx, stats, recorder)
if err != nil {
return nil, err
}

// Store the dictionary in the cache
dc.cache.Add(key, ddict)

return ddict, nil
}

func (dc *dictionaryCache) fetchDictionary(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) {
path := rang.ResourcePath()
off := rang.Ranges[idx].DictionaryOffset
ln := rang.Ranges[idx].DictionaryLength

ctx := context.Background()
pathToUrl := dc.dlds.refreshes[path]
if pathToUrl == nil {
// We manually construct the RangeChunk and DownloadLoc in this case because we are retrieving the dictionary span.
// We'll make a single span request, and consume the entire response to create the dictionary.
sRang := &remotesapi.HttpGetRange{}
sRang.Url = rang.Url
sRang.Ranges = append(sRang.Ranges, &remotesapi.RangeChunk{Offset: off, Length: ln})
rang := &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: sRang}
dl := &remotesapi.DownloadLoc{Location: rang}

refresh := new(locationRefresh)
refresh.Add(dl)
dc.dlds.refreshes[path] = refresh
pathToUrl = refresh
}

urlF := func(lastError error) (string, error) {
earl, err := pathToUrl.GetURL(ctx, lastError, dc.client)
if err != nil {
return "", err
}
if earl == "" {
earl = path
}
return earl, nil
}

resp := reliable.StreamingRangeDownload(ctx, reliable.StreamingRangeRequest{
Fetcher: globalHttpFetcher,
Offset: off,
Length: uint64(ln),
UrlFact: urlF,
Stats: stats,
Health: recorder,
BackOffFact: func(ctx context.Context) backoff.BackOff {
return downloadBackOff(ctx, 3) // params.DownloadRetryCount)
},
Throughput: reliable.MinimumThroughputCheck{
CheckInterval: defaultRequestParams.ThroughputMinimumCheckInterval,
BytesPerCheck: defaultRequestParams.ThroughputMinimumBytesPerCheck,
NumIntervals: defaultRequestParams.ThroughputMinimumNumIntervals,
},
RespHeadersTimeout: defaultRequestParams.RespHeadersTimeout,
})
defer resp.Close()

buf := make([]byte, ln)
_, err := io.ReadFull(resp.Body, buf)
if err != nil {
return nil, err
}

// Dictionaries are compressed, but with vanilla zstd, so there is no dictionary.
rawDict, err := gozstd.Decompress(nil, buf)
if err != nil {
return nil, err
}

return gozstd.NewDDict(rawDict)
}
Loading
Loading