-
Notifications
You must be signed in to change notification settings - Fork 12
Add resumable downloads for Azure & HTTP datastores #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,8 @@ import ( | |
|
|
||
| const ( | ||
| // SingleMB contains chunk size | ||
| SingleMB int64 = 4 * 1024 * 1024 | ||
| SingleMB int64 = 1 * 1024 * 1024 | ||
| chunkSize int64 = 4 * SingleMB | ||
| parallelism = 16 | ||
| ) | ||
|
|
||
|
|
@@ -203,7 +204,7 @@ func DeleteAzureBlob( | |
| // 1. Open/create local file. | ||
| // 2. Reuse existing downloaded parts (doneParts) if resuming. | ||
| // 3. Uses DoBatchTransfer: | ||
| // a. Splits download into SingleMB (1 MB) chunks. | ||
| // a. Splits download into chunkSize (4 MB) chunks. | ||
| // b. Downloads 16 parts in parallel (parallelism). | ||
| // c. Uses buffer pool (sync.Pool) to reuse memory. | ||
| // d. Tracks progress and sends updates via prgNotify. | ||
|
|
@@ -249,8 +250,20 @@ func DownloadAzureBlob( | |
| } | ||
| defer f.Close() | ||
|
|
||
| totalChunks := int((objSize + SingleMB - 1) / SingleMB) | ||
| progress := int64(0) | ||
| totalChunks := int((objSize + chunkSize - 1) / chunkSize) | ||
| // Build a fast lookup for completed chunks and compute accurate progress | ||
| doneChunks := make([]bool, totalChunks) | ||
| var progress int64 | ||
| for _, p := range doneParts.Parts { | ||
| idx := int(p.Ind) | ||
| if 0 <= idx && idx < totalChunks && !doneChunks[idx] { | ||
| doneChunks[idx] = true | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand why you need an array. It makes sense to have an array if it is possible to have holes, but I don't see that (perhaps it is outside of this PR?). Instead you can just store There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is safer that way. We download 16 chunks concurrently so it might be the case that we have holes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, that makes sense to me. |
||
| progress += p.Size // real bytes written | ||
| } | ||
| } | ||
| stats.Asize = progress | ||
| stats.Size = objSize | ||
|
|
||
| errCh := make(chan error, totalChunks) | ||
| mu := &sync.Mutex{} | ||
| var wg sync.WaitGroup | ||
|
|
@@ -263,8 +276,11 @@ func DownloadAzureBlob( | |
| } | ||
|
|
||
| for chunkIndex := i; chunkIndex < endChunk; chunkIndex++ { | ||
| start := int64(chunkIndex) * SingleMB | ||
| end := start + SingleMB - 1 | ||
| if doneChunks[chunkIndex] { // already downloaded in a previous run | ||
| continue | ||
| } | ||
| start := int64(chunkIndex) * chunkSize | ||
| end := start + chunkSize - 1 | ||
| if end >= objSize { | ||
| end = objSize - 1 | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -493,6 +493,158 @@ func testHTTPDatastoreRepeat(t *testing.T) { | |||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| func TestHTTPDatastoreResume(t *testing.T) { | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for adding a test :-) |
||||||||
| if err := setup(); err != nil { | ||||||||
| t.Fatalf("setup error: %v", err) | ||||||||
| } | ||||||||
| if err := os.MkdirAll(httpDownloadDir, 0755); err != nil { | ||||||||
| t.Fatalf("unable to make download directory: %v", err) | ||||||||
| } | ||||||||
| if err := os.MkdirAll(nettraceFolder, 0755); err != nil { | ||||||||
| t.Fatalf("unable to make nettrace log directory: %v", err) | ||||||||
| } | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
just to be sure that if you run the test a second time, it does not interfere with leftovers from the first run Perhaps the same for |
||||||||
| defer os.RemoveAll(nettraceFolder) | ||||||||
|
|
||||||||
| const totalSize = 1024 * 1024 * 50 // 50 MiB | ||||||||
| const preSize = 1024 * 1024 * 20 // 20 MiB already downloaded | ||||||||
|
|
||||||||
| // Helper to create server + source file | ||||||||
| makeServerAndFile := func(ignoreRange bool) (ts *httptest.Server, tempDir, filename, infile string, inHash string, cleanup func(), err error) { | ||||||||
| tempDir = t.TempDir() | ||||||||
|
|
||||||||
| filename = "big.bin" | ||||||||
| infile = filepath.Join(tempDir, filename) | ||||||||
| _, inHash, e := createRandomFile(infile, totalSize) | ||||||||
| if e != nil { | ||||||||
| err = fmt.Errorf("unable to create random file %s: %v", infile, e) | ||||||||
| return nil, "", "", "", "", nil, err | ||||||||
| } | ||||||||
|
|
||||||||
| r := chi.NewRouter() | ||||||||
| r.Get("/*", func(w http.ResponseWriter, r *http.Request) { | ||||||||
| path := filepath.Join(tempDir, r.URL.Path) | ||||||||
| f, e := os.Open(path) | ||||||||
| if e != nil { | ||||||||
| w.WriteHeader(http.StatusNotFound) | ||||||||
| return | ||||||||
| } | ||||||||
| defer f.Close() | ||||||||
|
|
||||||||
| // Serve with or without range support | ||||||||
| if ignoreRange { | ||||||||
| // deliberately ignore client's Range | ||||||||
| r.Header.Del("Range") | ||||||||
| } | ||||||||
| http.ServeContent(w, r, filepath.Base(path), time.Now(), f) | ||||||||
| }) | ||||||||
| ts = httptest.NewServer(r) | ||||||||
|
|
||||||||
| cleanup = func() { | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see you like lambdas, so your lambda returns a lambda :D |
||||||||
| ts.Close() | ||||||||
| _ = os.Remove(infile) | ||||||||
| } | ||||||||
| return | ||||||||
| } | ||||||||
|
|
||||||||
| // Pre-create partial local file (simulate previous interrupted run) | ||||||||
| makePartialLocal := func(target, src string, n int64) error { | ||||||||
| if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { | ||||||||
| return err | ||||||||
| } | ||||||||
| sf, err := os.Open(src) | ||||||||
| if err != nil { | ||||||||
| return err | ||||||||
| } | ||||||||
| defer sf.Close() | ||||||||
| df, err := os.Create(target) | ||||||||
| if err != nil { | ||||||||
| return err | ||||||||
| } | ||||||||
| defer df.Close() | ||||||||
| _, err = io.CopyN(df, sf, n) | ||||||||
| if err != nil { | ||||||||
| return err | ||||||||
| } | ||||||||
| return nil | ||||||||
| } | ||||||||
|
|
||||||||
| // Range-capable server (206) — should request bytes=preSize- and finish | ||||||||
| t.Run("ResumeWithRangeServer", func(t *testing.T) { | ||||||||
| ts, _, filename, infile, inHash, cleanup, err := makeServerAndFile(false) | ||||||||
| if err != nil { | ||||||||
| t.Fatalf("%v", err) | ||||||||
| } | ||||||||
| defer cleanup() | ||||||||
|
|
||||||||
| target := filepath.Join(httpDownloadDir, "resume-range.bin") | ||||||||
| defer os.Remove(target) | ||||||||
|
|
||||||||
| if err := makePartialLocal(target, infile, preSize); err != nil { | ||||||||
| t.Fatalf("prep partial local: %v", err) | ||||||||
| } | ||||||||
|
|
||||||||
| // Download should resume and complete | ||||||||
| status, msg := operationHTTP(t, zedUpload.SyncOpDownload, ts.URL, "", filename, target, false) | ||||||||
| if status { | ||||||||
| t.Fatalf("resume download failed: %v", msg) | ||||||||
| } | ||||||||
|
|
||||||||
| // Verify size and hash | ||||||||
| st, err := os.Stat(target) | ||||||||
| if err != nil { | ||||||||
| t.Fatalf("stat target: %v", err) | ||||||||
| } | ||||||||
| if st.Size() != int64(totalSize) { | ||||||||
| t.Fatalf("expected %d bytes, got %d", totalSize, st.Size()) | ||||||||
| } | ||||||||
| outHash, err := sha256File(target) | ||||||||
| if err != nil { | ||||||||
| t.Fatalf("hash target: %v", err) | ||||||||
| } | ||||||||
| if outHash != inHash { | ||||||||
| t.Fatalf("hash mismatch: src %s dst %s", inHash, outHash) | ||||||||
| } | ||||||||
| }) | ||||||||
|
|
||||||||
| // Server ignores Range (always 200) — client should discard preSize and still produce a correct file | ||||||||
| t.Run("ResumeNoRangeServer_DiscardPrefix", func(t *testing.T) { | ||||||||
| ts, _, filename, infile, inHash, cleanup, err := makeServerAndFile(true /* ignoreRange */) | ||||||||
| if err != nil { | ||||||||
| t.Fatalf("%v", err) | ||||||||
| } | ||||||||
| defer cleanup() | ||||||||
|
|
||||||||
| target := filepath.Join(httpDownloadDir, "resume-norange.bin") | ||||||||
| defer os.Remove(target) | ||||||||
|
|
||||||||
| if err := makePartialLocal(target, infile, preSize); err != nil { | ||||||||
| t.Fatalf("prep partial local: %v", err) | ||||||||
| } | ||||||||
|
|
||||||||
| // Download should "resume" by discarding preSize from body and appending remainder | ||||||||
| status, msg := operationHTTP(t, zedUpload.SyncOpDownload, ts.URL, "", filename, target, false) | ||||||||
| if status { | ||||||||
| t.Fatalf("resume download failed: %v", msg) | ||||||||
| } | ||||||||
|
|
||||||||
| // Verify size and hash | ||||||||
| st, err := os.Stat(target) | ||||||||
| if err != nil { | ||||||||
| t.Fatalf("stat target: %v", err) | ||||||||
| } | ||||||||
| if st.Size() != int64(totalSize) { | ||||||||
| t.Fatalf("expected %d bytes, got %d", totalSize, st.Size()) | ||||||||
| } | ||||||||
| outHash, err := sha256File(target) | ||||||||
| if err != nil { | ||||||||
| t.Fatalf("hash target: %v", err) | ||||||||
| } | ||||||||
| if outHash != inHash { | ||||||||
| t.Fatalf("hash mismatch: src %s dst %s", inHash, outHash) | ||||||||
| } | ||||||||
| }) | ||||||||
| } | ||||||||
|
|
||||||||
| func sha256File(filePath string) (string, error) { | ||||||||
| hasher := sha256.New() | ||||||||
| f, err := os.Open(filePath) | ||||||||
|
|
||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually had
SingleMBdefined for 4MB? 🤦♂️