diff --git a/zedUpload/azureutil/azure.go b/zedUpload/azureutil/azure.go index 5b44128..dd71bf3 100644 --- a/zedUpload/azureutil/azure.go +++ b/zedUpload/azureutil/azure.go @@ -28,7 +28,8 @@ import ( const ( // SingleMB contains chunk size - SingleMB int64 = 4 * 1024 * 1024 + SingleMB int64 = 1 * 1024 * 1024 + chunkSize int64 = 4 * SingleMB parallelism = 16 ) @@ -203,7 +204,7 @@ func DeleteAzureBlob( // 1. Open/create local file. // 2. Reuse existing downloaded parts (doneParts) if resuming. // 3. Uses DoBatchTransfer: -// a. Splits download into SingleMB (1 MB) chunks. +// a. Splits download into chunkSize (4 MB) chunks. // b. Downloads 16 parts in parallel (parallelism). // c. Uses buffer pool (sync.Pool) to reuse memory. // d. Tracks progress and sends updates via prgNotify. @@ -249,8 +250,20 @@ func DownloadAzureBlob( } defer f.Close() - totalChunks := int((objSize + SingleMB - 1) / SingleMB) - progress := int64(0) + totalChunks := int((objSize + chunkSize - 1) / chunkSize) + // Build a fast lookup for completed chunks and compute accurate progress + doneChunks := make([]bool, totalChunks) + var progress int64 + for _, p := range doneParts.Parts { + idx := int(p.Ind) + if 0 <= idx && idx < totalChunks && !doneChunks[idx] { + doneChunks[idx] = true + progress += p.Size // real bytes written + } + } + stats.Asize = progress + stats.Size = objSize + errCh := make(chan error, totalChunks) mu := &sync.Mutex{} var wg sync.WaitGroup @@ -263,8 +276,11 @@ func DownloadAzureBlob( } for chunkIndex := i; chunkIndex < endChunk; chunkIndex++ { - start := int64(chunkIndex) * SingleMB - end := start + SingleMB - 1 + if doneChunks[chunkIndex] { // already downloaded in a previous run + continue + } + start := int64(chunkIndex) * chunkSize + end := start + chunkSize - 1 if end >= objSize { end = objSize - 1 } diff --git a/zedUpload/datastore_http.go b/zedUpload/datastore_http.go index 313ecdf..c2aae71 100644 --- a/zedUpload/datastore_http.go +++ b/zedUpload/datastore_http.go @@ -120,8 +120,10 @@ func (ep *HttpTransportMethod) processHttpUpload(req *DronaRequest) (error, int) if err != nil { return err, 0 } + + doneParts := req.GetDoneParts() stats, resp := zedHttp.ExecCmd(req.cancelContext, "post", postUrl, req.name, - req.objloc, req.sizelimit, prgChan, hClient, ep.inactivityTimeout) + req.objloc, req.sizelimit, prgChan, doneParts, hClient, ep.inactivityTimeout) return stats.Error, resp.BodyLength } @@ -140,8 +142,10 @@ func (ep *HttpTransportMethod) processHttpDownload(req *DronaRequest) (error, in if err != nil { return err, 0 } + + doneParts := req.GetDoneParts() stats, resp := zedHttp.ExecCmd(req.cancelContext, "get", file, "", - req.objloc, req.sizelimit, prgChan, hClient, ep.inactivityTimeout) + req.objloc, req.sizelimit, prgChan, doneParts, hClient, ep.inactivityTimeout) return stats.Error, resp.BodyLength } @@ -162,8 +166,10 @@ func (ep *HttpTransportMethod) processHttpList(req *DronaRequest) ([]string, err if err != nil { return nil, err } + + doneParts := req.GetDoneParts() stats, resp := zedHttp.ExecCmd(req.cancelContext, "ls", listUrl, "", "", - req.sizelimit, prgChan, hClient, ep.inactivityTimeout) + req.sizelimit, prgChan, doneParts, hClient, ep.inactivityTimeout) return resp.List, stats.Error } @@ -182,8 +188,10 @@ func (ep *HttpTransportMethod) processHttpObjectMetaData(req *DronaRequest) (err if err != nil { return err, 0 } + + doneParts := req.GetDoneParts() stats, resp := zedHttp.ExecCmd(req.cancelContext, "meta", file, "", req.objloc, - req.sizelimit, prgChan, hClient, ep.inactivityTimeout) + req.sizelimit, prgChan, doneParts, hClient, ep.inactivityTimeout) return stats.Error, resp.ContentLength } func (ep *HttpTransportMethod) getContext() *DronaCtx { diff --git a/zedUpload/datastore_http_test.go b/zedUpload/datastore_http_test.go index 1fe0d72..9941731 100644 --- a/zedUpload/datastore_http_test.go +++ b/zedUpload/datastore_http_test.go @@ -493,6 +493,158 @@ func testHTTPDatastoreRepeat(t *testing.T) { } } +func TestHTTPDatastoreResume(t *testing.T) { + if err := setup(); err != nil { + t.Fatalf("setup error: %v", err) + } + if err := os.MkdirAll(httpDownloadDir, 0755); err != nil { + t.Fatalf("unable to make download directory: %v", err) + } + if err := os.MkdirAll(nettraceFolder, 0755); err != nil { + t.Fatalf("unable to make nettrace log directory: %v", err) + } + defer os.RemoveAll(nettraceFolder) + + const totalSize = 1024 * 1024 * 50 // 50 MiB + const preSize = 1024 * 1024 * 20 // 20 MiB already downloaded + + // Helper to create server + source file + makeServerAndFile := func(ignoreRange bool) (ts *httptest.Server, tempDir, filename, infile string, inHash string, cleanup func(), err error) { + tempDir = t.TempDir() + + filename = "big.bin" + infile = filepath.Join(tempDir, filename) + _, inHash, e := createRandomFile(infile, totalSize) + if e != nil { + err = fmt.Errorf("unable to create random file %s: %v", infile, e) + return nil, "", "", "", "", nil, err + } + + r := chi.NewRouter() + r.Get("/*", func(w http.ResponseWriter, r *http.Request) { + path := filepath.Join(tempDir, r.URL.Path) + f, e := os.Open(path) + if e != nil { + w.WriteHeader(http.StatusNotFound) + return + } + defer f.Close() + + // Serve with or without range support + if ignoreRange { + // deliberately ignore client's Range + r.Header.Del("Range") + } + http.ServeContent(w, r, filepath.Base(path), time.Now(), f) + }) + ts = httptest.NewServer(r) + + cleanup = func() { + ts.Close() + _ = os.Remove(infile) + } + return + } + + // Pre-create partial local file (simulate previous interrupted run) + makePartialLocal := func(target, src string, n int64) error { + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { + return err + } + sf, err := os.Open(src) + if err != nil { + return err + } + defer sf.Close() + df, err := os.Create(target) + if err != nil { + return err + } + defer df.Close() + _, err = io.CopyN(df, sf, n) + if err != nil { + return err + } + return nil + } + + // Range-capable server (206) — should request bytes=preSize- and finish + t.Run("ResumeWithRangeServer", func(t *testing.T) { + ts, _, filename, infile, inHash, cleanup, err := makeServerAndFile(false) + if err != nil { + t.Fatalf("%v", err) + } + defer cleanup() + + target := filepath.Join(httpDownloadDir, "resume-range.bin") + defer os.Remove(target) + + if err := makePartialLocal(target, infile, preSize); err != nil { + t.Fatalf("prep partial local: %v", err) + } + + // Download should resume and complete + status, msg := operationHTTP(t, zedUpload.SyncOpDownload, ts.URL, "", filename, target, false) + if status { + t.Fatalf("resume download failed: %v", msg) + } + + // Verify size and hash + st, err := os.Stat(target) + if err != nil { + t.Fatalf("stat target: %v", err) + } + if st.Size() != int64(totalSize) { + t.Fatalf("expected %d bytes, got %d", totalSize, st.Size()) + } + outHash, err := sha256File(target) + if err != nil { + t.Fatalf("hash target: %v", err) + } + if outHash != inHash { + t.Fatalf("hash mismatch: src %s dst %s", inHash, outHash) + } + }) + + // Server ignores Range (always 200) — client should discard preSize and still produce a correct file + t.Run("ResumeNoRangeServer_DiscardPrefix", func(t *testing.T) { + ts, _, filename, infile, inHash, cleanup, err := makeServerAndFile(true /* ignoreRange */) + if err != nil { + t.Fatalf("%v", err) + } + defer cleanup() + + target := filepath.Join(httpDownloadDir, "resume-norange.bin") + defer os.Remove(target) + + if err := makePartialLocal(target, infile, preSize); err != nil { + t.Fatalf("prep partial local: %v", err) + } + + // Download should "resume" by discarding preSize from body and appending remainder + status, msg := operationHTTP(t, zedUpload.SyncOpDownload, ts.URL, "", filename, target, false) + if status { + t.Fatalf("resume download failed: %v", msg) + } + + // Verify size and hash + st, err := os.Stat(target) + if err != nil { + t.Fatalf("stat target: %v", err) + } + if st.Size() != int64(totalSize) { + t.Fatalf("expected %d bytes, got %d", totalSize, st.Size()) + } + outHash, err := sha256File(target) + if err != nil { + t.Fatalf("hash target: %v", err) + } + if outHash != inHash { + t.Fatalf("hash mismatch: src %s dst %s", inHash, outHash) + } + }) +} + func sha256File(filePath string) (string, error) { hasher := sha256.New() f, err := os.Open(filePath) diff --git a/zedUpload/httputil/http.go b/zedUpload/httputil/http.go index 515c4b1..cd4c6e5 100644 --- a/zedUpload/httputil/http.go +++ b/zedUpload/httputil/http.go @@ -64,8 +64,15 @@ func getHref(token html.Token) (ok bool, href string) { // ExecCmd performs various commands such as "ls", "get", etc. // Note that "host" needs to contain the URL in the case of a get -func ExecCmd(ctx context.Context, cmd, host, remoteFile, localFile string, objSize int64, - prgNotify types.StatsNotifChan, client *http.Client, inactivityTimeout time.Duration) (types.UpdateStats, Resp) { +func ExecCmd( + ctx context.Context, + cmd, host, remoteFile, localFile string, + objSize int64, + prgNotify types.StatsNotifChan, + doneParts types.DownloadedParts, + client *http.Client, + inactivityTimeout time.Duration, +) (types.UpdateStats, Resp) { if ctx == nil { ctx = context.Background() } @@ -121,7 +128,7 @@ func ExecCmd(ctx context.Context, cmd, host, remoteFile, localFile string, objSi rsp.List = imgList return stats, rsp case "get": - return execCmdGet(ctx, objSize, localFile, host, client, prgNotify, inactivityTimeout) + return execCmdGet(ctx, objSize, localFile, host, client, doneParts, prgNotify, inactivityTimeout) case "post": file, err := os.Open(localFile) if err != nil { @@ -196,8 +203,16 @@ func ExecCmd(ctx context.Context, cmd, host, remoteFile, localFile string, objSi // execCmdGet executes the get command. // NOTE: These **must** be named return values, or our defer to modify them will not work. -func execCmdGet(ctx context.Context, objSize int64, localFile string, host string, client *http.Client, prgNotify types.StatsNotifChan, inactivityTimeout time.Duration) (stats types.UpdateStats, rsp Resp) { - var copiedSize int64 +func execCmdGet( + ctx context.Context, + objSize int64, + localFile, host string, + client *http.Client, + doneParts types.DownloadedParts, + prgNotify types.StatsNotifChan, + inactivityTimeout time.Duration, +) (stats types.UpdateStats, rsp Resp) { + copiedSize := doneParts.PartSize stats.Size = objSize dirErr := os.MkdirAll(filepath.Dir(localFile), 0755) @@ -205,12 +220,21 @@ func execCmdGet(ctx context.Context, objSize int64, localFile string, host strin stats.Error = dirErr return stats, Resp{} } - local, fileErr := os.Create(localFile) - if fileErr != nil { - stats.Error = fileErr + + // Ensures we seek 0 bytes when the local file is created + if _, err := os.Stat(localFile); os.IsNotExist(err) { + copiedSize = 0 + } + + var local *os.File + local, stats.Error = os.OpenFile(localFile, os.O_RDWR|os.O_CREATE, 0644) + if stats.Error != nil { return stats, Resp{} } defer local.Close() + if _, stats.Error = local.Seek(copiedSize, io.SeekStart); stats.Error != nil { + return stats, Resp{} + } var errorList []string defer func() { @@ -218,7 +242,8 @@ func execCmdGet(ctx context.Context, objSize int64, localFile string, host strin stats.Error = fmt.Errorf("%s: %s", host, strings.Join(errorList, "; ")) } }() - supportRange := false //is server supports ranges requests, false for the first request + + supportRange := copiedSize > 0 forceRestart := false delay := time.Second lastModified := "" @@ -240,8 +265,8 @@ func execCmdGet(ctx context.Context, objSize int64, localFile string, host strin } } - // restart from the beginning if server do not support ranges or we forced to restart - if !supportRange || forceRestart { + // restart from the beginning if forced + if forceRestart { err := local.Truncate(0) if err != nil { appendToErrorList("failed truncate file: %s", err) @@ -290,15 +315,18 @@ func execCmdGet(ctx context.Context, objSize int64, localFile string, host strin // supportRange indicates if server supports range requests supportRange = resp.Header.Get("Accept-Ranges") == "bytes" + if supportRange && resp.StatusCode == http.StatusOK { + appendToErrorList("server ignored Range; skipping copiedBytes manually") + } //if we not receive StatusOK for request without Range header or StatusPartialContent for request with range //it indicates that server misconfigured - if !withRange && resp.StatusCode != http.StatusOK || withRange && resp.StatusCode != http.StatusPartialContent { - respErr := fmt.Sprintf("bad response code: %d", resp.StatusCode) - appendToErrorList(respErr) - //we do not want to process server misconfiguration here + if (!withRange && resp.StatusCode != http.StatusOK) || + (withRange && resp.StatusCode != http.StatusPartialContent) { + appendToErrorList(fmt.Sprintf("bad response code: %d", resp.StatusCode)) break } + newLastModified := resp.Header.Get("Last-Modified") if lastModified != "" && newLastModified != lastModified { // last modified changed, retry from the beginning @@ -311,16 +339,36 @@ func execCmdGet(ctx context.Context, objSize int64, localFile string, host strin // we received StatusOK which is the response for the whole content, not for the partial one rsp.BodyLength = int(resp.ContentLength) } - var written int64 // use the inactivityReader to trigger failure for the timeouts inactivityReader := NewTimeoutReader(inactivityTimeout, resp.Body) - for { - var copyErr error + lenParts := int64(len(doneParts.Parts)) + stats.Asize = copiedSize + stats.DoneParts = doneParts + + // if server ignored Range and we have copied some chunks + if !supportRange && copiedSize > 0 { + if _, derr := io.CopyN(io.Discard, inactivityReader, copiedSize); derr != nil { + if errors.Is(derr, io.EOF) { + // We already had the whole file + errorList = nil + return stats, rsp + } + appendToErrorList("discard %d failed: %v", copiedSize, derr) + continue + } + // From here on, we append the remainder. + } - written, copyErr = io.CopyN(local, inactivityReader, chunkSize) + for partIdx := lenParts; ; partIdx++ { + written, copyErr := io.CopyN(local, inactivityReader, chunkSize) copiedSize += written stats.Asize = copiedSize + stats.DoneParts.Parts = append(stats.DoneParts.Parts, &types.PartDefinition{ + Ind: partIdx, + Size: written, + }) + stats.DoneParts.PartSize = copiedSize // possible situations: // err != nil && err == io.EOF - end of file, wrap up and return