diff --git a/apis/swagger.yml b/apis/swagger.yml index 602d08874..477497f95 100644 --- a/apis/swagger.yml +++ b/apis/swagger.yml @@ -1441,6 +1441,31 @@ definitions: items: $ref: "#/definitions/Node" + HeartBeatRequest: + type: "object" + description: "" + properties: + IP: + type: "string" + description: "IP address which peer client carries" + format: "string" + port: + type: "integer" + description: | + when registering, dfget will setup one uploader process. + This one acts as a server for peer pulling tasks. + This port is which this server listens on. + format: "int32" + minimum: 15000 + maximum: 65000 + cID: + type: "string" + description: | + CID means the client ID. It maps to the specific dfget process. + When user wishes to download an image/file, user would start a dfget process to do this. + This dfget is treated a client and carries a client ID. + Thus, multiple dfget processes on the same peer have different CIDs. + ErrorResponse: type: "object" description: | diff --git a/apis/types/heart_beat_request.go b/apis/types/heart_beat_request.go new file mode 100644 index 000000000..cfe04bfab --- /dev/null +++ b/apis/types/heart_beat_request.go @@ -0,0 +1,86 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package types + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/swag" + "github.com/go-openapi/validate" +) + +// HeartBeatRequest heart beat request +// swagger:model HeartBeatRequest +type HeartBeatRequest struct { + + // IP address which peer client carries + IP string `json:"IP,omitempty"` + + // CID means the client ID. It maps to the specific dfget process. + // When user wishes to download an image/file, user would start a dfget process to do this. + // This dfget is treated a client and carries a client ID. + // Thus, multiple dfget processes on the same peer have different CIDs. + // + CID string `json:"cID,omitempty"` + + // when registering, dfget will setup one uploader process. + // This one acts as a server for peer pulling tasks. + // This port is which this server listens on. + // + // Maximum: 65000 + // Minimum: 15000 + Port int32 `json:"port,omitempty"` +} + +// Validate validates this heart beat request +func (m *HeartBeatRequest) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validatePort(formats); err != nil { + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *HeartBeatRequest) validatePort(formats strfmt.Registry) error { + + if swag.IsZero(m.Port) { // not required + return nil + } + + if err := validate.MinimumInt("port", "body", int64(m.Port), 15000, false); err != nil { + return err + } + + if err := validate.MaximumInt("port", "body", int64(m.Port), 65000, false); err != nil { + return err + } + + return nil +} + +// MarshalBinary interface implementation +func (m *HeartBeatRequest) MarshalBinary() ([]byte, error) { + if m == nil { + return nil, nil + } + return swag.WriteJSON(m) +} + +// UnmarshalBinary interface implementation +func (m *HeartBeatRequest) UnmarshalBinary(b []byte) error { + var res HeartBeatRequest + if err := swag.ReadJSON(b, &res); err != nil { + return err + } + *m = res + return nil +} diff --git a/cmd/dfdaemon/app/root.go b/cmd/dfdaemon/app/root.go index beb973552..06e1bd385 100644 --- a/cmd/dfdaemon/app/root.go +++ b/cmd/dfdaemon/app/root.go @@ -18,6 +18,7 @@ package app import ( "encoding/json" + "github.com/dragonflyoss/Dragonfly/dfdaemon/localManager" "os" "os/exec" "path/filepath" @@ -68,6 +69,9 @@ var rootCmd = &cobra.Command{ cfgJSON, _ := json.Marshal(cfg) logrus.Infof("using config: %s", cfgJSON) + + _ = localManager.NewLocalManager(cfg.DFGetConfig()) + s, err := dfdaemon.NewFromConfig(*cfg) if err != nil { return errors.Wrap(err, "create dfdaemon from config") diff --git a/dfdaemon/config/config.go b/dfdaemon/config/config.go index 4c783fbad..3c84b3590 100644 --- a/dfdaemon/config/config.go +++ b/dfdaemon/config/config.go @@ -113,6 +113,9 @@ type Properties struct { LogConfig dflog.LogConfig `yaml:"logConfig" json:"logConfig"` LocalIP string `yaml:"localIP" json:"localIP"` PeerPort int `yaml:"peerPort" json:"peerPort"` + + // Extreme is + Extreme *ExtremeConfig `yaml:"extreme" json:"extreme"` } // Validate validates the config @@ -145,13 +148,14 @@ func (p *Properties) DFGetConfig() DFGetConfig { } dfgetConfig := DFGetConfig{ - DfgetFlags: dfgetFlags, - SuperNodes: p.SuperNodes, - RateLimit: p.RateLimit.String(), - DFRepo: p.DFRepo, - DFPath: p.DFPath, - LocalIP: p.LocalIP, - PeerPort: p.PeerPort, + DfgetFlags: dfgetFlags, + SuperNodes: p.SuperNodes, + RateLimit: p.RateLimit.String(), + DFRepo: p.DFRepo, + DFPath: p.DFPath, + LocalIP: p.LocalIP, + PeerPort: p.PeerPort, + SpecKeyOfExtremeTaskID: p.Extreme.SpecKeyOfTaskID, } if p.HijackHTTPS != nil { dfgetConfig.HostsConfig = p.HijackHTTPS.Hosts @@ -166,19 +170,25 @@ func (p *Properties) DFGetConfig() DFGetConfig { }) } } + + if dfgetConfig.SpecKeyOfExtremeTaskID == "" { + dfgetConfig.SpecKeyOfExtremeTaskID = constant.DefaultSpecKeyOfExtremeTaskID + } + return dfgetConfig } // DFGetConfig configures how dfdaemon calls dfget. type DFGetConfig struct { - DfgetFlags []string `yaml:"dfget_flags"` - SuperNodes []string `yaml:"supernodes"` - RateLimit string `yaml:"ratelimit"` - DFRepo string `yaml:"localrepo"` - DFPath string `yaml:"dfpath"` - HostsConfig []*HijackHost `yaml:"hosts" json:"hosts"` - PeerPort int `yaml:"peerPort"` - LocalIP string `yaml:"localIP"` + DfgetFlags []string `yaml:"dfget_flags"` + SuperNodes []string `yaml:"supernodes"` + RateLimit string `yaml:"ratelimit"` + DFRepo string `yaml:"localrepo"` + DFPath string `yaml:"dfpath"` + HostsConfig []*HijackHost `yaml:"hosts" json:"hosts"` + PeerPort int `yaml:"peerPort"` + LocalIP string `yaml:"localIP"` + SpecKeyOfExtremeTaskID string `yaml:"specKeyOfTaskID"` } // RegistryMirror configures the mirror of the official docker registry @@ -412,3 +422,11 @@ func NewProxy(regx string, useHTTPS bool, direct bool, redirect string) (*Proxy, func (r *Proxy) Match(url string) bool { return r.Regx != nil && r.Regx.MatchString(url) } + +// ExtremeConfig represents the config of extreme mode +type ExtremeConfig struct { + // SpecKeyOfExtremeTaskID defines the header key which represents the taskID + SpecKeyOfTaskID string `yaml:"specKeyOfTaskID" json:"specKeyOfTaskID"` + + SpecKeyOfDirectRet string `yaml:"specKeyOfDirectRet" json:"specKeyOfDirectRet"` +} diff --git a/dfdaemon/constant/constant.go b/dfdaemon/constant/constant.go index 94ec17603..16eb2a091 100644 --- a/dfdaemon/constant/constant.go +++ b/dfdaemon/constant/constant.go @@ -43,6 +43,10 @@ const ( const ( // DefaultConfigPath is the default path of dfdaemon configuration file. DefaultConfigPath = "/etc/dragonfly/dfdaemon.yml" + + // DefaultSpecKeyOfExtremeTaskID is the default value of key which represents + // the taskID in extreme mode. + DefaultSpecKeyOfExtremeTaskID = "X-extreme-key" ) const ( diff --git a/dfdaemon/downloader/p2p/dfclient.go b/dfdaemon/downloader/p2p/dfclient.go index a2be39e17..7532a954a 100644 --- a/dfdaemon/downloader/p2p/dfclient.go +++ b/dfdaemon/downloader/p2p/dfclient.go @@ -19,12 +19,9 @@ package p2p import ( "context" "fmt" - "github.com/dragonflyoss/Dragonfly/pkg/httputils" "io" - "net/http" "os" "path/filepath" - "strconv" "strings" "time" @@ -43,11 +40,6 @@ type DFClient struct { dfClient core.DFGet } -type DigestStruct struct { - Digest string - httputils.RangeStruct -} - func (c *DFClient) DownloadContext(ctx context.Context, url string, header map[string][]string, name string) (string, error) { // startTime := time.Now() dstPath := filepath.Join(c.config.DFRepo, name) @@ -107,27 +99,11 @@ func (c *DFClient) doDownload(ctx context.Context, url string, header map[string runtimeConfig.URL = url runtimeConfig.RV.TaskURL = url runtimeConfig.RV.TaskFileName = getTaskFileName(destPath, c.dfGetConfig.Sign) - runtimeConfig.Header = flattenHeader(header) + runtimeConfig.Header = FlattenHeader(header) runtimeConfig.Output = destPath runtimeConfig.RV.RealTarget = destPath runtimeConfig.RV.TargetDir = filepath.Dir(destPath) - hr := http.Header(header) - if digestHeaderStr := hr.Get(dfgetcfg.StrDigest); digestHeaderStr != "" { - ds, err := getDigest(digestHeaderStr) - if err != nil { - return nil, err - } - - // todo: support the merge request - if len(ds) != 1 { - return nil, fmt.Errorf("no support to merge request") - } - - runtimeConfig.RV.Digest = ds[0].Digest - runtimeConfig.RV.FileLength = (ds[0].EndIndex - ds[0].StartIndex) + 1 - } - return c.dfClient.GetReader(ctx, &runtimeConfig) } @@ -139,7 +115,7 @@ func getCid(localIP string, sign string) string { return localIP + "-" + sign } -func flattenHeader(header map[string][]string) []string { +func FlattenHeader(header map[string][]string) []string { var res []string for key, value := range header { // discard HTTP host header for backing to source successfully @@ -156,61 +132,3 @@ func flattenHeader(header map[string][]string) []string { } return res } - -func getDigest(digestHeaderStr string) ([]*DigestStruct, error) { - var ( - digest string - rangeStr string - ) - - // digestHeaderStr looks like "sha256_1:0,1000;sha256_2:1001,2000" - - result := []*DigestStruct{} - - arr := strings.Split(digestHeaderStr, ";") - for _, elem := range arr { - kv := strings.Split(elem, ":") - if len(kv) > 3 || len(kv) < 2 { - return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr) - } - - if len(kv) == 2 { - digest = fmt.Sprintf("sha256:%s", kv[0]) - rangeStr = kv[1] - } - - if len(kv) == 3 { - digest = fmt.Sprintf("%s:%s", kv[0], kv[1]) - rangeStr = kv[2] - } - - // todo: verify the sha256 string - - rangeIndex := strings.Split(rangeStr, ",") - if len(rangeIndex) != 2 { - return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr) - } - - startIndex, err := strconv.ParseInt(rangeIndex[0], 10, 64) - if err != nil { - return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr) - } - - endIndex, err := strconv.ParseInt(rangeIndex[1], 10, 64) - if err != nil { - return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr) - } - - ds := &DigestStruct{ - Digest: digest, - RangeStruct: httputils.RangeStruct{ - StartIndex: startIndex, - EndIndex: endIndex, - }, - } - - result = append(result, ds) - } - - return result, nil -} diff --git a/dfdaemon/downloader/p2p/dfclient_test.go b/dfdaemon/downloader/p2p/dfclient_test.go index db1d4d01c..91db43846 100644 --- a/dfdaemon/downloader/p2p/dfclient_test.go +++ b/dfdaemon/downloader/p2p/dfclient_test.go @@ -63,7 +63,7 @@ func (tc *testcase) WithValue(key string, value ...string) *testcase { func (tc *testcase) Test(t *testing.T) { a := assert.New(t) - actualResult := flattenHeader(tc.header) + actualResult := FlattenHeader(tc.header) for _, r := range actualResult { if _, ok := tc.result[r]; !ok { diff --git a/dfdaemon/localManager/cache.go b/dfdaemon/localManager/cache.go new file mode 100644 index 000000000..1664edf33 --- /dev/null +++ b/dfdaemon/localManager/cache.go @@ -0,0 +1,12 @@ +package localManager + +import "github.com/dragonflyoss/Dragonfly/pkg/syncmap" + +type cache struct { + +} + +type cacheManager struct { + // key is taskID, value is the + taskContainer *syncmap.SyncMap +} \ No newline at end of file diff --git a/dfdaemon/localManager/local_downloader.go b/dfdaemon/localManager/local_downloader.go new file mode 100644 index 000000000..a832589b6 --- /dev/null +++ b/dfdaemon/localManager/local_downloader.go @@ -0,0 +1,293 @@ +package localManager + +import ( + "context" + "fmt" + "github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/p2p" + "github.com/dragonflyoss/Dragonfly/dfdaemon/transport" + "github.com/dragonflyoss/Dragonfly/dfget/config" + "github.com/dragonflyoss/Dragonfly/dfget/core/api" + "github.com/dragonflyoss/Dragonfly/dfget/core/downloader/p2p_downloader" + "github.com/dragonflyoss/Dragonfly/dfget/types" + "github.com/dragonflyoss/Dragonfly/pkg/constants" + "github.com/dragonflyoss/Dragonfly/pkg/queue" + "github.com/dragonflyoss/Dragonfly/pkg/ratelimiter" + "io" + "os" + "path/filepath" + "time" + + "github.com/pborman/uuid" + "github.com/sirupsen/logrus" +) + +type downloadNodeInfo struct { + ip string + port int + path string + + peerID string + // source url? + directSource bool + url string + header map[string][]string + // is download from local + local bool +} + +// LocalDownloader will download the file, copy to stream and to local file system. +type LocalDownloader struct { + selectNodes []*downloadNodeInfo + config *config.Config + queue queue.Queue + clientQueue queue.Queue + + taskID string + length int64 + + // super node ip, may be "" + node string + url string + header map[string][]string + + outPath string + systemDataDir string + taskFileName string + + superAPI api.SupernodeAPI + downloadAPI api.DownloadAPI + uploaderAPI api.UploaderAPI + + // postNotifyUploader should be called after notify the local uploader finish + postNotifyUploader func(req *api.FinishTaskRequest) +} + +func NewLocalDownloader() *LocalDownloader { + return &LocalDownloader{ + queue: queue.NewQueue(10), + clientQueue: queue.NewQueue(10), + } +} + +func (ld *LocalDownloader) RunStream(ctx context.Context) (io.Reader, error) { + csw := downloader.NewClientStreamWriter(ctx, ld.clientQueue, ld.superAPI, ld.config, true, ld.length) + go func() { + err := ld.run(ctx, csw) + if err != nil { + logrus.Warnf("P2PDownloader run error: %s", err) + } + }() + return csw, nil +} + +func (ld *LocalDownloader) run(ctx context.Context, pieceWriter downloader.PieceWriter) error { + var ( + lastErr error + ) + + // start PieceWriter + if err := pieceWriter.PreRun(ctx); err != nil { + return err + } + go func() { + pieceWriter.Run(ctx) + }() + + for _, info := range ld.selectNodes { + ld.processPiece(ctx, info) + success, err := ld.processItem(ctx, info) + if !success { + lastErr = err + continue + } + + if success { + return nil + } + } + + return lastErr +} + +// +func (ld *LocalDownloader) processItem(ctx context.Context, info *downloadNodeInfo) (success bool, err error) { + for { + v, ok := ld.queue.PollTimeout(2 * time.Second) + if ! ok { + continue + } + + item := v.(*downloader.Piece) + if item.Result == constants.ResultFail || item.Result == constants.ResultInvalid { + // todo: get client error + return false, fmt.Errorf("failed to download from %s", item.DstCid) + } + + go ld.finishTask(item, info) + return true, nil + } + +} + +// task is finished, try to download to local file system and report to super node +func (ld *LocalDownloader) finishTask(piece *downloader.Piece, info *downloadNodeInfo) { + if !info.local { + tmpPath := filepath.Join(ld.systemDataDir, uuid.New()) + f, err := os.OpenFile(tmpPath, os.O_TRUNC | os.O_WRONLY | os.O_CREATE, 0664) + if err != nil { + logrus.Warnf("failed to open tmp path: %v", err) + return + } + + _, err = io.Copy(f, piece.RawContent()) + if err != nil { + f.Close() + logrus.Warnf("failed to write tmp path: %v", err) + return + } + + f.Close() + + err = os.Rename(tmpPath, ld.outPath) + if err != nil { + logrus.Warnf("failed to rename: %v", err) + return + } + } + + ld.reportResource(info) +} + +func (ld *LocalDownloader) reportResource(info *downloadNodeInfo) { + // report to supernode + registerReq := &types.RegisterRequest{ + RawURL: ld.url, + TaskURL: ld.url, + TaskId: ld.taskID, + FileLength: ld.length, + Insecure: ld.config.Insecure, + Dfdaemon: ld.config.DFDaemon, + Path: ld.taskFileName, + IP: ld.config.RV.LocalIP, + Port: ld.config.RV.PeerPort, + Cid: ld.config.RV.Cid, + Headers: p2p.FlattenHeader(ld.header), + Md5: ld.config.Md5, + Identifier: ld.config.Identifier, + } + + if info.local { + registerReq.Path = info.path + } + + reportSuperNode := "" + + for _, node := range ld.config.Nodes { + registerReq.SupernodeIP = node + resp, err := ld.superAPI.ReportResource(node, registerReq) + if err != nil { + logrus.Error(err) + } + + if err == nil && resp.Code == constants.Success { + logrus.Infof("success to report resource %v to supernode", registerReq) + reportSuperNode = node + break + } + } + + if reportSuperNode == "" { + return + } + + if info.local { + logrus.Infof("success to download task %s from local", ld.taskID) + return + } + + // notify local uploader + finishTaskReq := &api.FinishTaskRequest{ + TaskFileName: ld.taskFileName, + TaskID: ld.taskID, + Node: reportSuperNode, + ClientID: ld.config.RV.Cid, + Other: api.FinishTaskOther{ + RawURL: registerReq.RawURL, + TaskURL: registerReq.TaskURL, + FileLength: registerReq.FileLength, + Headers: registerReq.Headers, + SpecReport: true, + }, + } + err := ld.uploaderAPI.FinishTask(ld.config.RV.LocalIP, ld.config.RV.PeerPort, finishTaskReq) + if err != nil { + logrus.Errorf("failed to finish task %v for uploader: %v", finishTaskReq, err) + }else{ + logrus.Infof("success to finish task %v for uploader", finishTaskReq) + if ld.postNotifyUploader != nil { + ld.postNotifyUploader(finishTaskReq) + } + } +} + +func (ld *LocalDownloader) processPiece(ctx context.Context, info* downloadNodeInfo) { + logrus.Debugf("pieces to be processed:%v", info) + pieceTask := &types.PullPieceTaskResponseContinueData{ + Range: fmt.Sprintf("0-%d", ld.length - 1 + config.PieceMetaSize), + PieceNum: 0, + PieceSize: int32(ld.length), + Cid: info.peerID, + PeerIP: info.ip, + PeerPort: info.port, + Path: fmt.Sprintf("%s%s", config.PeerHTTPPathPrefix, info.path), + Url: info.url, + Header: info.header, + DirectSource: info.directSource, + } + + go ld.startTask(ctx, pieceTask) +} + +// PowerClient will download file content and push content to queue and clientQueue +func (ld *LocalDownloader) startTask(ctx context.Context, data *types.PullPieceTaskResponseContinueData) { + var( + nWare transport.NumericalWare + key string + ) + + nWareOb := ctx.Value("numericalWare") + ware, ok := nWareOb.(transport.NumericalWare) + if ok { + nWare = ware + } + + keyOb := ctx.Value("key") + k, ok := keyOb.(string) + if ok { + key = k + } + + powerClientConfig := &downloader.PowerClientConfig{ + TaskID: ld.taskID, + Node: ld.node, + PieceTask: data, + Cfg: ld.config, + Queue: ld.queue, + ClientQueue: ld.clientQueue, + RateLimiter: ratelimiter.NewRateLimiter(int64(ld.config.LocalLimit), 2), + DownloadAPI: ld.downloadAPI, + } + + powerClient := downloader.NewPowerClient(powerClientConfig) + if err := powerClient.Run(); err != nil && powerClient.ClientError() != nil { + //p2p.API.ReportClientError(p2p.node, powerClient.ClientError()) + logrus.Errorf("report client error: %v", powerClient.ClientError()) + }else{ + cost := powerClient.CostReadTime() + if nWare != nil { + nWare.Add(key, transport.RemoteIOName, cost.Nanoseconds()) + } + } +} + + diff --git a/dfdaemon/localManager/local_manager.go b/dfdaemon/localManager/local_manager.go new file mode 100644 index 000000000..6617823b1 --- /dev/null +++ b/dfdaemon/localManager/local_manager.go @@ -0,0 +1,466 @@ +package localManager + +import ( + "context" + "fmt" + types2 "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/dfdaemon/config" + "github.com/dragonflyoss/Dragonfly/dfdaemon/scheduler" + "github.com/dragonflyoss/Dragonfly/dfdaemon/transport" + dfgetcfg "github.com/dragonflyoss/Dragonfly/dfget/config" + "github.com/dragonflyoss/Dragonfly/dfget/core/api" + "github.com/dragonflyoss/Dragonfly/dfget/core/helper" + "github.com/dragonflyoss/Dragonfly/dfget/types" + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/httputils" + "github.com/go-openapi/strfmt" + "io" + "math" + "net/http" + "os" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +var( + once sync.Once + localManager *LocalManager +) + +// LocalManager will handle all the request, it will +type LocalManager struct { + sm *scheduler.SchedulerManager + supernodeAPI api.SupernodeAPI + downloadAPI api.DownloadAPI + uploaderAPI api.UploaderAPI + + dfGetConfig *dfgetcfg.Config + cfg config.DFGetConfig + + rm *requestManager + + syncP2PNetworkCh chan string + + syncTimeLock sync.Mutex + syncTime time.Time + + // recentFetchUrls is the urls which as the parameters to fetch the p2p network recently + recentFetchUrls []string +} + +func NewLocalManager(cfg config.DFGetConfig) *LocalManager { + once.Do(func() { + dfcfg := convertToDFGetConfig(cfg, nil) + localPeer := &types2.PeerInfo{ + ID: dfcfg.RV.Cid, + IP: strfmt.IPv4(dfcfg.RV.LocalIP), + Port: int32(dfcfg.RV.PeerPort), + } + + localManager = &LocalManager{ + sm: scheduler.NewScheduler(localPeer), + supernodeAPI: api.NewSupernodeAPI(), + downloadAPI: api.NewDownloadAPI(), + uploaderAPI: api.NewUploaderAPI(30 * time.Second), + rm: newRequestManager(), + dfGetConfig: dfcfg, + cfg: cfg, + syncTime: time.Now(), + syncP2PNetworkCh: make(chan string, 2), + } + + go localManager.fetchLoop(context.Background()) + go localManager.syncLocalTaskLoop(context.Background()) + go localManager.heartBeatLoop(context.Background()) + }) + + return localManager +} + +func (lm *LocalManager) fetchLoop(ctx context.Context) { + var( + lastTime time.Time + ) + defaultInterval := 30 * time.Second + ticker := time.NewTicker(defaultInterval) + defer ticker.Stop() + + for{ + select { + case <- ctx.Done(): + return + case <- ticker.C: + lm.syncTimeLock.Lock() + lastTime = lm.syncTime + lm.syncTimeLock.Unlock() + + if lastTime.Add(defaultInterval).After(time.Now()) { + continue + } + + lm.syncP2PNetworkInfo(lm.rm.getRecentRequest(0)) + case url := <- lm.syncP2PNetworkCh: + if lm.isRecentFetch(url) { + // the url is fetch recently, directly ignore it + continue + } + lm.syncP2PNetworkInfo(lm.rm.getRecentRequest(0)) + } + } +} + +func (lm *LocalManager) isRecentFetch(url string) bool { + lm.syncTimeLock.Lock() + defer lm.syncTimeLock.Unlock() + + for _, u := range lm.recentFetchUrls { + if u == url { + return true + } + } + + return false +} + +func convertToDFGetConfig(cfg config.DFGetConfig, oldCfg *dfgetcfg.Config) *dfgetcfg.Config { + sign := fmt.Sprintf("%d-%.3f", + os.Getpid(), float64(time.Now().UnixNano())/float64(time.Second)) + + newCfg:= &dfgetcfg.Config{ + Nodes: cfg.SuperNodes, + DFDaemon: true, + Pattern: dfgetcfg.PatternCDN, + Sign: sign, + RV: dfgetcfg.RuntimeVariable{ + LocalIP: cfg.LocalIP, + PeerPort: cfg.PeerPort, + SystemDataDir: cfg.DFRepo, + DataDir: cfg.DFRepo, + Cid: fmt.Sprintf("%s-%s", cfg.LocalIP, sign), + }, + } + + if oldCfg != nil { + newCfg.RV.Cid = oldCfg.RV.Cid + newCfg.Sign = oldCfg.Sign + } + + return newCfg +} + +// +func (lm *LocalManager) DownloadStreamContext(ctx context.Context, url string, header map[string][]string, name string) (io.Reader, error) { + var( + infos = []*downloadNodeInfo{} + rd io.Reader + localDownloader *LocalDownloader + nWare transport.NumericalWare + key string + ) + + taskID := httputils.GetTaskIDFromHeader(url, header, lm.cfg.SpecKeyOfExtremeTaskID) + if taskID == "" { + return nil, fmt.Errorf("in extreme mode, taskID should be set") + } + + nWareOb := ctx.Value("numericalWare") + ware, ok := nWareOb.(transport.NumericalWare) + if ok { + nWare = ware + } + + keyOb := ctx.Value("key") + k, ok := keyOb.(string) + if ok { + key = k + } + + startTime := time.Now() + + info := &downloadNodeInfo{ + directSource: true, + url: url, + header: header, + } + infos = append(infos, info) + + defer lm.rm.addRequest(url, false) + + length := lm.getLengthFromHeader(url, header) + + logrus.Infof("start to download, url: %s, header: %v, taskID: %s, length: %d", url, + header, taskID, length) + + + // try to download from peer by internal schedule + result, err := lm.sm.SchedulerByTaskID(ctx, taskID, lm.dfGetConfig.RV.Cid, "", 0) + if nWare != nil { + nWare.Add(key, transport.ScheduleName, time.Since(startTime).Nanoseconds()) + startTime = time.Now() + } + if err != nil { + go lm.scheduleBySuperNode(ctx, url, header, name, taskID, length) + }else{ + tmpInfos := make([]*downloadNodeInfo, len(result)) + for i, r := range result { + tmpInfos[i] = &downloadNodeInfo{ + ip: r.PeerInfo.IP.String(), + port: int(r.PeerInfo.Port), + path: r.Pieces[0].Path, + peerID: r.PeerInfo.ID, + local: r.Local, + } + } + + infos = append(tmpInfos, infos...) + } + + // local download + localDownloader = NewLocalDownloader() + localDownloader.selectNodes = infos + localDownloader.length = length + localDownloader.taskID = taskID + localDownloader.systemDataDir = lm.dfGetConfig.RV.SystemDataDir + localDownloader.outPath = helper.GetServiceFile(name, lm.dfGetConfig.RV.SystemDataDir) + localDownloader.downloadAPI = lm.downloadAPI + localDownloader.superAPI = lm.supernodeAPI + localDownloader.uploaderAPI = lm.uploaderAPI + localDownloader.config = lm.dfGetConfig + localDownloader.header = header + localDownloader.url = url + localDownloader.taskFileName = name + localDownloader.postNotifyUploader = func(req *api.FinishTaskRequest) { + localTask := &types2.TaskFetchInfo{ + Task: &types2.TaskInfo{ + ID: req.TaskID, + FileLength: req.Other.FileLength, + HTTPFileLength: req.Other.FileLength, + //Headers: req.Other.Headers, + PieceSize: int32(req.Other.FileLength), + PieceTotal: 1, + TaskURL: req.Other.TaskURL, + RawURL: req.Other.RawURL, + }, + Pieces: []*types2.PieceInfo{ + { + Path: req.TaskFileName, + }, + }, + } + lm.sm.AddLocalTaskInfo(localTask) + } + + rd, err = localDownloader.RunStream(ctx) + logrus.Infof("return io.read: %v", rd) + return rd, err +} + +func (lm *LocalManager) scheduleBySuperNode(ctx context.Context, url string, header map[string][]string, name string, taskID string, length int64) { + lm.rm.addRequest(url, false) + lm.notifyFetchP2PNetwork(url) +} + +func (lm *LocalManager) notifyFetchP2PNetwork(url string) { + lm.syncP2PNetworkCh <- url +} + +func (lm *LocalManager) isDownloadDirectReturnSrc(ctx context.Context, url string) (bool, error) { + rs, err := lm.rm.getRequestState(url) + if err != nil { + if err == errortypes.ErrDataNotFound { + return false, nil + } + + return false, err + } + + return rs.needReturnSrc(), nil +} + +// downloadFromPeer download file from peer node. +// param: +// taskFileName: target file name +func (lm *LocalManager) downloadFromPeer(peer *types2.PeerInfo, taskFileName string, taskInfo *types2.TaskInfo) (io.Reader, error) { + resp, err := lm.downloadAPI.Download(peer.IP.String(), int(peer.Port), &api.DownloadRequest{Path: taskFileName, + PieceRange: fmt.Sprintf("%d-%d", 0, taskInfo.FileLength - 1), PieceNum: 1, PieceSize: int32(taskInfo.FileLength)}, 30 * time.Second) + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("resp code is %d, not %d", resp.StatusCode, http.StatusOK) + } + + // todo: close the body + return resp.Body, nil +} + +func (lm *LocalManager) getLengthFromHeader(url string, header map[string][]string) int64 { + hr := http.Header(header) + if headerStr := hr.Get(dfgetcfg.StrRange); headerStr != "" { + ds, err := httputils.GetRangeSE(headerStr, math.MaxInt64) + if err != nil { + return 0 + } + + // todo: support the merge request + if len(ds) != 1 { + return 0 + } + + return (ds[0].EndIndex - ds[0].StartIndex + 1) + } + + return 0 +} + +// sync p2p network,this function should called by +func (lm *LocalManager) syncP2PNetworkInfo(urls []string) { + if len(urls) == 0 { + logrus.Infof("no urls to syncP2PNetworkInfo") + return + } + nodes, err := lm.fetchP2PNetworkInfo(urls) + if err != nil { + logrus.Errorf("failed to fetchP2PNetworkInfo: %v", err) + return + } + + lm.sm.SyncSchedulerInfo(nodes) + logrus.Infof("success to sync schedule info") + lm.syncTimeLock.Lock() + defer lm.syncTimeLock.Unlock() + lm.syncTime = time.Now() + lm.recentFetchUrls = urls +} + +func (lm *LocalManager) fetchP2PNetworkInfo(urls []string) ([]*types2.Node, error) { + req := &types.FetchP2PNetworkInfoRequest{ + Urls: urls, + } + + for _, node := range lm.dfGetConfig.Nodes { + result, err := lm.fetchP2PNetworkFromSupernode(node, req) + if err != nil { + continue + } + + return result, nil + } + + return nil, nil +} + +func (lm *LocalManager) fetchP2PNetworkFromSupernode(node string, req *types.FetchP2PNetworkInfoRequest) ([]*types2.Node, error) { + var( + start int = 0 + limit int =100 + ) + + result := []*types2.Node{} + for { + resp, err := lm.supernodeAPI.FetchP2PNetworkInfo(node, start, limit, req) + if err != nil { + return nil, err + } + + if resp.Code != http.StatusOK { + return nil, fmt.Errorf("failed to fetch p2p network info: %s", resp.Msg) + } + + result = append(result, resp.Data.Nodes...) + if len(resp.Data.Nodes) < limit { + break + } + } + + return result, nil +} + +// syncLocalTaskLoop fetch local tasks to add to local schedule +func (lm *LocalManager) syncLocalTaskLoop(ctx context.Context) { + for { + check := lm.checkUploader(ctx, 30 * time.Second) + if !check { + time.Sleep(time.Minute) + continue + } + + break + } + + // call it firstly + lm.fetchAndSyncLocalTask() + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for{ + select { + case <- ctx.Done(): + return + case <- ticker.C: + lm.fetchAndSyncLocalTask() + } + } +} + +func (lm *LocalManager) checkUploader(ctx context.Context, timeout time.Duration) bool { + ticker := time.NewTicker(time.Second * 1) + t := time.NewTimer(timeout) + defer t.Stop() + defer ticker.Stop() + + for{ + select { + case <- ticker.C: + if lm.uploaderAPI.PingServer(lm.cfg.LocalIP, lm.cfg.PeerPort) { + return true + } + case <- t.C: + return false + + case <- ctx.Done(): + return false + } + } + + return false +} + +func (lm *LocalManager) fetchAndSyncLocalTask() { + result, err := lm.uploaderAPI.FetchLocalTask(lm.cfg.LocalIP, lm.cfg.PeerPort) + if err != nil { + logrus.Errorf("failed to fetch local task: %v", err) + return + } + + lm.sm.SyncLocalTaskInfo(result) +} + +func (lm *LocalManager) heartBeatLoop(ctx context.Context) { + ticker := time.NewTicker(time.Second * 30) + defer ticker.Stop() + + for{ + select { + case <- ctx.Done(): + return + case <- ticker.C: + lm.heartbeat() + } + } +} + +func (lm *LocalManager) heartbeat() { + for _,node := range lm.cfg.SuperNodes { + lm.supernodeAPI.HeartBeat(node, &types2.HeartBeatRequest{ + IP: lm.cfg.LocalIP, + Port: int32(lm.cfg.PeerPort), + CID: lm.dfGetConfig.RV.Cid, + }) + } +} diff --git a/dfdaemon/localManager/request_manager.go b/dfdaemon/localManager/request_manager.go new file mode 100644 index 000000000..90459cf5c --- /dev/null +++ b/dfdaemon/localManager/request_manager.go @@ -0,0 +1,81 @@ +package localManager + +import ( + "time" + + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/queue" + + "github.com/pkg/errors" +) + +// requestManager manage the recent the requests, it provides the +type requestManager struct { + q *queue.CircleQueue +} + +func newRequestManager() *requestManager { + return &requestManager{ + q: queue.NewCircleQueue(100), + } +} + +func (rm *requestManager) addRequest(url string, directReturnSrc bool) error { + data, err := rm.q.GetItemByKey(url) + if err != nil && err != errortypes.ErrDataNotFound { + return err + } + + var rs *requestState = nil + if err == errortypes.ErrDataNotFound { + rs = newRequestState(url, directReturnSrc) + }else { + ors, ok := data.(*requestState) + if !ok { + return errors.Wrapf(errortypes.ErrConvertFailed, "value: %v", data) + } + + rs = ors.copy() + rs.updateRecentTime() + // if directReturnSrc == true, update first time to extend the returnSrc time interval + if directReturnSrc { + rs.directReturnSrc = directReturnSrc + rs.firstTime = time.Now() + } + } + + rs.updateRecentTime() + rm.q.PutFront(url, rs) + + return nil +} + +// getRecentRequest will return 5 of the recent request url +func (rm *requestManager) getRecentRequest(count int) []string { + if count == 0 { + count = 5 + } + arr := rm.q.GetFront(count) + result := []string{} + + for _, d := range arr { + if rs, ok := d.(*requestState); ok { + result = append(result, rs.url) + } + } + + return result +} + +func (rm *requestManager) getRequestState(url string) (*requestState, error) { + data, err := rm.q.GetItemByKey(url) + if err != nil { + return nil, err + } + + if rs, ok := data.(*requestState); ok { + return rs, nil + } + + return nil, errors.Wrapf(errortypes.ErrConvertFailed, "value: %v", data) +} diff --git a/dfdaemon/localManager/request_manager_test.go b/dfdaemon/localManager/request_manager_test.go new file mode 100644 index 000000000..f5256cea3 --- /dev/null +++ b/dfdaemon/localManager/request_manager_test.go @@ -0,0 +1,123 @@ +package localManager + +import ( + "github.com/go-check/check" + "testing" +) + +type RequestManagerSuite struct{} + +func Test(t *testing.T) { + check.TestingT(t) +} + +func init() { + check.Suite(&RequestManagerSuite{}) +} + +func (suite *RequestManagerSuite) TestRequestManager(c *check.C) { + for _, i := range []struct { + name string + urls []string + expectUrls []string + recentCount int + }{ + { + name: "oneUrlWithOneExpect", + urls: []string{ + "a.com", + }, + expectUrls: []string{ + "a.com", + }, + recentCount: 1, + },{ + name: "MoreUrlWithOneExpect", + urls: []string{ + "a.com", + "b.com", + "c.com", + "b.com", + }, + expectUrls: []string{ + "b.com", + }, + recentCount: 1, + }, + { + name: "MoreUrlWithTwoExpect", + urls: []string{ + "a.com", + "b.com", + "c.com", + "b.com", + "d.com", + "a.com", + "e.com", + }, + expectUrls: []string{ + "a.com", + "e.com", + }, + recentCount: 2, + }, + { + name: "twoUrlWithMoreExpect", + urls: []string{ + "a.com", + "b.com", + "a.com", + }, + expectUrls: []string{ + "a.com", + "b.com", + }, + recentCount: 5, + }, + { + name: "MoreUrlWithMoreExpect", + urls: []string{ + "a.com", + "b.com", + "a.com", + "c.com", + "d.com", + "e.com", + "f.com", + "f.com", + "d.com", + }, + expectUrls: []string{ + "a.com", + "c.com", + "e.com", + "f.com", + "d.com", + }, + recentCount: 5, + }, + }{ + c.Logf("testcase %s", i.name) + rm := newRequestManager() + for _, url := range i.urls { + err := rm.addRequest(url, false) + c.Assert(err, check.IsNil) + } + + result := rm.getRecentRequest(i.recentCount) + c.Assert(len(result), check.Equals, len(i.expectUrls)) + + assertMap := map[string]struct{}{} + for _, u := range result { + assertMap[u] = struct{}{} + } + + for _, epU := range i.expectUrls { + _, exist := assertMap[epU] + c.Assert(exist, check.Equals, true) + delete(assertMap, epU) + } + + c.Assert(len(assertMap), check.Equals, 0) + } +} \ No newline at end of file diff --git a/dfdaemon/localManager/request_state.go b/dfdaemon/localManager/request_state.go new file mode 100644 index 000000000..5bb6b29b9 --- /dev/null +++ b/dfdaemon/localManager/request_state.go @@ -0,0 +1,57 @@ +package localManager + +import ( + "time" +) + +const( + defaultReturnSrcInterval = 30 * time.Second +) + +type requestState struct { + // the request url + url string + // the time when the url firstly requested + firstTime time.Time + // the recent time when the url requested + recentTime time.Time + + // is direct to return src url + directReturnSrc bool + + returnSrcInterval time.Duration +} + +func newRequestState(url string, directReturnSrc bool) *requestState { + return &requestState{ + url: url, + firstTime: time.Now(), + recentTime: time.Now(), + directReturnSrc: directReturnSrc, + returnSrcInterval: defaultReturnSrcInterval, + } +} + +func (rs *requestState) copy() *requestState { + return &requestState{ + url: rs.url, + firstTime: rs.firstTime, + recentTime: rs.recentTime, + directReturnSrc: rs.directReturnSrc, + returnSrcInterval: rs.returnSrcInterval, + } +} + +func (rs *requestState) updateRecentTime() { + rs.recentTime = time.Now() + if rs.directReturnSrc && rs.firstTime.Add(rs.returnSrcInterval).Before(time.Now()) { + rs.directReturnSrc = false + } +} + +func (rs *requestState) needReturnSrc() bool { + return rs.directReturnSrc && rs.firstTime.Add(rs.returnSrcInterval).After(time.Now()) +} + + + diff --git a/dfdaemon/proxy/proxy.go b/dfdaemon/proxy/proxy.go index 61fddbad8..9436ccca2 100644 --- a/dfdaemon/proxy/proxy.go +++ b/dfdaemon/proxy/proxy.go @@ -29,7 +29,9 @@ import ( "github.com/dragonflyoss/Dragonfly/dfdaemon/config" "github.com/dragonflyoss/Dragonfly/dfdaemon/downloader" + "github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/dfget" "github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/p2p" + "github.com/dragonflyoss/Dragonfly/dfdaemon/localManager" "github.com/dragonflyoss/Dragonfly/dfdaemon/transport" "github.com/pkg/errors" @@ -108,6 +110,20 @@ func WithStreamDownloaderFactory(f downloader.StreamFactory) Option { } } +func WithExtremeDownloaderFactory(f downloader.StreamFactory) Option { + return func(p *Proxy) error { + p.extremeDownloadFactory = f + return nil + } +} + +func WithConfigProperties(c *config.Properties) Option { + return func(p *Proxy) error { + p.config = c + return nil + } +} + // New returns a new transparent proxy with the given rules func New(opts ...Option) (*Proxy, error) { proxy := &Proxy{ @@ -129,9 +145,15 @@ func NewFromConfig(c config.Properties) (*Proxy, error) { WithRules(c.Proxies), WithRegistryMirror(c.RegistryMirror), WithStreamDownloaderFactory(func() downloader.Stream { - // dfget.NewGetter(c.DFGetConfig()) return p2p.NewClient(c.DFGetConfig()) }), + WithExtremeDownloaderFactory(func() downloader.Stream { + return localManager.NewLocalManager(c.DFGetConfig()) + }), + WithDownloaderFactory(func() downloader.Interface { + return dfget.NewGetter(c.DFGetConfig()) + }), + WithConfigProperties(&c), } logrus.Infof("registry mirror: %s", c.RegistryMirror.Remote) @@ -179,16 +201,20 @@ type Proxy struct { // directHandler are used to handle non proxy requests directHandler http.Handler // downloadFactory returns the downloader used for p2p downloading - downloadFactory downloader.Factory - streamDownloadFactory downloader.StreamFactory + downloadFactory downloader.Factory + streamDownloadFactory downloader.StreamFactory + extremeDownloadFactory downloader.StreamFactory + config *config.Properties } func (proxy *Proxy) mirrorRegistry(w http.ResponseWriter, r *http.Request) { reverseProxy := httputil.NewSingleHostReverseProxy(proxy.registry.Remote.URL) t, err := transport.New( transport.WithDownloader(proxy.downloadFactory()), + transport.WithExtremeDownloader(proxy.extremeDownloadFactory()), transport.WithTLS(proxy.registry.TLSConfig()), transport.WithCondition(proxy.shouldUseDfgetForMirror), + transport.WithConfig(proxy.config), ) if err != nil { http.Error(w, fmt.Sprintf("failed to get transport: %v", err), http.StatusInternalServerError) @@ -249,8 +275,10 @@ func (proxy *Proxy) handleHTTP(w http.ResponseWriter, req *http.Request) { func (proxy *Proxy) roundTripper(tlsConfig *tls.Config) http.RoundTripper { rt, _ := transport.New( transport.WithStreamDownloader(proxy.streamDownloadFactory()), + transport.WithExtremeDownloader(proxy.extremeDownloadFactory()), transport.WithTLS(tlsConfig), transport.WithCondition(proxy.shouldUseDfget), + transport.WithConfig(proxy.config), ) return rt } diff --git a/dfdaemon/scheduler/dataMap.go b/dfdaemon/scheduler/dataMap.go new file mode 100644 index 000000000..b91703301 --- /dev/null +++ b/dfdaemon/scheduler/dataMap.go @@ -0,0 +1,80 @@ +package scheduler + +import ( + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/stringutils" + "github.com/dragonflyoss/Dragonfly/pkg/syncmap" + "github.com/pkg/errors" + + +) + +type dataMap struct { + *syncmap.SyncMap +} + +func newDataMap() *dataMap { + return &dataMap{ + syncmap.NewSyncMap(), + } +} + +func (dm *dataMap) add(key string, value interface{}) error { + return dm.Add(key, value) +} + +func (dm *dataMap) remove(key string) error { + return dm.Remove(key) +} + +func (dm *dataMap) getAsTaskState(key string) (*taskState, error) { + if stringutils.IsEmptyStr(key) { + return nil, errors.Wrap(errortypes.ErrEmptyValue, "taskID") + } + + v, err := dm.Get(key) + if err != nil { + return nil, err + } + + if ts, ok := v.(*taskState); ok { + return ts, nil + } + + return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) +} + +func (dm *dataMap) getAsNode(key string) (*types.Node, error) { + if stringutils.IsEmptyStr(key) { + return nil, errors.Wrap(errortypes.ErrEmptyValue, "taskID") + } + + v, err := dm.Get(key) + if err != nil { + return nil, err + } + + if ts, ok := v.(*types.Node); ok { + return ts, nil + } + + return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) +} + +func (dm *dataMap) getAsLocalTaskState(key string) (*localTaskState, error) { + if stringutils.IsEmptyStr(key) { + return nil, errors.Wrap(errortypes.ErrEmptyValue, "taskID") + } + + v, err := dm.Get(key) + if err != nil { + return nil, err + } + + if lts, ok := v.(*localTaskState); ok { + return lts, nil + } + + return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) +} diff --git a/dfdaemon/scheduler/local_task_state.go b/dfdaemon/scheduler/local_task_state.go new file mode 100644 index 000000000..88011f4be --- /dev/null +++ b/dfdaemon/scheduler/local_task_state.go @@ -0,0 +1,8 @@ +package scheduler + +import "github.com/dragonflyoss/Dragonfly/apis/types" + +type localTaskState struct { + task *types.TaskFetchInfo + path string +} diff --git a/dfdaemon/scheduler/scheduler.go b/dfdaemon/scheduler/scheduler.go new file mode 100644 index 000000000..0156382f6 --- /dev/null +++ b/dfdaemon/scheduler/scheduler.go @@ -0,0 +1,241 @@ +package scheduler + +import ( + "context" + "fmt" + "github.com/dragonflyoss/Dragonfly/apis/types" + dtypes "github.com/dragonflyoss/Dragonfly/dfget/types" + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "sync" + + "github.com/sirupsen/logrus" +) + +const( + defaultMaxLoad = 5 + defaultMaxPeers = 3 +) + +// SchedulerManager schedules the peer node to fetch the resource specified by request +type SchedulerManager struct { + sync.Mutex + localPeerInfo *types.PeerInfo + // generation + generation int64 + + // key is taskID, value is taskState + taskContainer *dataMap + + // key is taskID, value is localTaskState + localTaskContainer *dataMap + + // key is peerID, value is Node + nodeContainer *dataMap + + // key is the taskURL + urlContainer *dataMap + + + downloadStartCh chan notifySt + downloadFinishCh chan notifySt +} + +func NewScheduler(localPeer *types.PeerInfo) *SchedulerManager { + sm := &SchedulerManager{ + taskContainer: newDataMap(), + localTaskContainer: newDataMap(), + nodeContainer: newDataMap(), + urlContainer: newDataMap(), + localPeerInfo: localPeer, + downloadStartCh: make(chan notifySt, 10), + downloadFinishCh: make(chan notifySt, 10), + + } + + go sm.adjustPeerLoad() + return sm +} + +// todo: +func (sm *SchedulerManager) adjustPeerLoad() { + for{ + select { + case st := <- sm.downloadStartCh: + if st.generation != sm.generation { + logrus.Infof("in local schedule downloadStartCh, notify %v is out of generation", st) + continue + } + + sm.updateNodeLoad(st.peerID, 1) + + case st := <- sm.downloadFinishCh: + if st.generation != sm.generation { + logrus.Infof("in local schedule downloadFinishCh , notify %v is out of generation", st) + continue + } + + sm.updateNodeLoad(st.peerID, -1) + } + } +} + +// if pieceRange == "" means all Pieces of file +func (sm *SchedulerManager) SchedulerByTaskID(ctx context.Context, taskID string, srcCid string, pieceRange string, pieceSize int32) ([]*Result, error) { + sm.Lock() + defer sm.Unlock() + + result := []*Result{} + + // get local task if + localRs := sm.scheduleLocalPeer(taskID) + if localRs != nil { + result = []*Result{localRs} + } + + remoteRs, err := sm.scheduleRemotePeer(ctx, taskID, srcCid, pieceRange, pieceSize) + if err != nil { + if len(result) > 0 { + return result, nil + } + return result, err + } + + result = append(result, remoteRs...) + + return result, nil +} + +func (sm *SchedulerManager) scheduleRemotePeer(ctx context.Context, taskID string, srcCid string, pieceRange string, pieceSize int32) ([]*Result, error) { + state, err := sm.taskContainer.getAsTaskState(taskID) + if err != nil { + return nil, err + } + + pns := state.getPeersByLoad(defaultMaxPeers, defaultMaxLoad) + if len(pns) == 0 { + return nil, fmt.Errorf("failed to schedule peers") + } + + result := make([]*Result, len(pns)) + for i, pn := range pns { + node, err := sm.nodeContainer.getAsNode(pn.peerID) + if err != nil { + logrus.Errorf("failed to get node: %v", err) + continue + } + + result[i] = &Result{ + DstCid: pn.peerID, + Pieces: pn.pieces, + Task: pn.info, + PeerInfo: node.Basic, + Generation: sm.generation, + StartDownload: func(peerID string, generation int64) { + sm.downloadStartCh <- notifySt{peerID: peerID, generation: generation} + }, + FinishDownload: func(peerID string, generation int64) { + sm.downloadFinishCh <- notifySt{peerID: peerID, generation: generation} + }, + } + } + + return result, nil +} + +func (sm *SchedulerManager) SchedulerByUrl(ctx context.Context, url string, srcCid string, pieceRange string, pieceSize int32) ([]*Result, error) { + return nil, nil +} + +func (sm *SchedulerManager) SyncSchedulerInfo(nodes []*types.Node) { + newTaskContainer := newDataMap() + newNodeContainer := newDataMap() + // todo: urlContainer init + + for _, node := range nodes { + newNodeContainer.add(node.Basic.ID, node) + sm.syncTaskContainerPerNode(node, newTaskContainer) + } + + // replace the taskContainer and nodeContainer + sm.Lock() + defer sm.Unlock() + + sm.taskContainer = newTaskContainer + sm.nodeContainer = newNodeContainer +} + +func (sm *SchedulerManager) SyncLocalTaskInfo(tasks *dtypes.FetchLocalTaskInfo) { + newLocalTaskContainer := newDataMap() + for _, task := range tasks.Tasks { + newLocalTaskContainer.add(task.Task.ID, &localTaskState{task: task}) + } + + // replace the localTaskContainer + sm.Lock() + defer sm.Unlock() + + sm.localTaskContainer = newLocalTaskContainer +} + +func (sm *SchedulerManager) AddLocalTaskInfo(task *types.TaskFetchInfo) { + sm.localTaskContainer.add(task.Task.ID, &localTaskState{task: task}) +} + +func (sm *SchedulerManager) syncTaskContainerPerNode(node *types.Node, taskContainer *dataMap) { + for _, task := range node.Tasks { + ts, err := taskContainer.getAsTaskState(task.Task.ID) + if err != nil && !errortypes.IsDataNotFound(err) { + logrus.Errorf("syncTaskContainerPerNode error: %v", err) + continue + } + + if ts == nil { + ts = newTaskState() + if err := taskContainer.add(task.Task.ID, ts); err != nil { + logrus.Errorf("syncTaskContainerPerNode add taskstate %v to taskContainer error: %v", ts, err) + continue + } + } + + err = ts.add(node.Basic.ID, &node.Load, task.Pieces, task.Task) + if err != nil { + logrus.Errorf("syncTaskContainerPerNode error: %v", err) + } + } +} + +func (sm *SchedulerManager) updateNodeLoad(peerID string, addLoad int) { + sm.Lock() + defer sm.Unlock() + + node, err := sm.nodeContainer.getAsNode(peerID) + if err != nil { + logrus.Errorf("updateNodeLoad failed: %v", err) + return + } + + node.Load = node.Load + int64(addLoad) + if node.Load < 0 { + node.Load = 0 + } +} + +func (sm *SchedulerManager) scheduleLocalPeer(taskID string) *Result { + lts, err := sm.localTaskContainer.getAsLocalTaskState(taskID) + if err != nil { + return nil + } + + return sm.covertLocalTaskStateToResult(lts) +} + +func (sm *SchedulerManager) covertLocalTaskStateToResult(lts *localTaskState) *Result { + return &Result{ + DstCid: sm.localPeerInfo.ID, + PeerInfo: sm.localPeerInfo, + Task: lts.task.Task, + Pieces: lts.task.Pieces, + Generation: sm.generation, + Local: true, + } +} diff --git a/dfdaemon/scheduler/task_state.go b/dfdaemon/scheduler/task_state.go new file mode 100644 index 000000000..f327ba02c --- /dev/null +++ b/dfdaemon/scheduler/task_state.go @@ -0,0 +1,92 @@ +package scheduler + +import ( + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/pkg/errors" + "sort" + + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/stringutils" + "github.com/dragonflyoss/Dragonfly/pkg/syncmap" +) + +type taskStatePerNode struct { + peerID string + load *int64 + info *types.TaskInfo + pieces []*types.PieceInfo +} + +type loadSorter struct { + items []*taskStatePerNode +} + +func (ls *loadSorter) Len() int { + return len(ls.items) +} +// Less reports whether the element with +// index i should sort before the element with index j. +func (ls *loadSorter) Less(i, j int) bool { + return *(ls.items[i].load) < *(ls.items[j].load) +} +// Swap swaps the elements with indexes i and j. +func (ls *loadSorter) Swap(i, j int) { + tmp := ls.items[i] + ls.items[i] = ls.items[j] + ls.items[j] = tmp +} + +type taskState struct { + // key is peerID, value is taskStatePerNode + peerContainer *syncmap.SyncMap +} + +func newTaskState() *taskState { + return &taskState{ + peerContainer: syncmap.NewSyncMap(), + } +} + +func (ts *taskState) add(peerID string, load *int64, pieces []*types.PieceInfo, info *types.TaskInfo) error { + if stringutils.IsEmptyStr(peerID) { + return errors.Wrap(errortypes.ErrEmptyValue, "peerID") + } + + _, err := ts.peerContainer.Get(peerID) + if err != nil && !errortypes.IsDataNotFound(err) { + return err + } + + item := &taskStatePerNode{ + peerID: peerID, + load: load, + pieces: pieces, + info: info, + } + + return ts.peerContainer.Add(peerID, item) +} + +// getPeersByLoad return the peers which satisfy the request, and order by load +// the number of peers should not more than maxCount; +// the peer load should not more than maxLoad. +func (ts *taskState) getPeersByLoad(maxCount int, maxLoad int64) []*taskStatePerNode { + sorter := loadSorter{} + + ts.peerContainer.Range(func(key, value interface{}) bool { + pn := value.(*taskStatePerNode) + if *(pn.load) >= maxLoad { + return true + } + + sorter.items = append(sorter.items, pn) + return true + }) + + sort.Sort(&sorter) + if maxCount > len(sorter.items) { + return sorter.items + } + + return sorter.items[:maxCount] +} diff --git a/dfdaemon/scheduler/types.go b/dfdaemon/scheduler/types.go new file mode 100644 index 000000000..9186bbe21 --- /dev/null +++ b/dfdaemon/scheduler/types.go @@ -0,0 +1,28 @@ +package scheduler + +import "github.com/dragonflyoss/Dragonfly/apis/types" + +type taskInfoWrapper struct { + task *types.TaskInfo + node *types.Node +} + +type Result struct { + DstCid string + Local bool + PeerInfo *types.PeerInfo + Task *types.TaskInfo + Pieces []*types.PieceInfo + + Generation int64 + // before download, it should be called + StartDownload func(peerID string, generation int64) + + // after finishing download, it should be called + FinishDownload func(peerID string, generation int64) +} + +type notifySt struct { + peerID string + generation int64 +} \ No newline at end of file diff --git a/dfdaemon/server.go b/dfdaemon/server.go index 799ccf3a2..6fbec1303 100644 --- a/dfdaemon/server.go +++ b/dfdaemon/server.go @@ -21,6 +21,8 @@ import ( "crypto/tls" "fmt" "net/http" + "path/filepath" + "time" "github.com/dragonflyoss/Dragonfly/dfdaemon/config" "github.com/dragonflyoss/Dragonfly/dfdaemon/handler" @@ -122,6 +124,11 @@ func LaunchPeerServer(cfg config.Properties) error { peerServerConfig.RV.LocalIP = cfg.LocalIP peerServerConfig.RV.PeerPort = cfg.PeerPort peerServerConfig.RV.ServerAliveTime = 0 + peerServerConfig.RV.SystemDataDir = cfg.DFRepo + peerServerConfig.RV.DataDir = cfg.DFRepo + peerServerConfig.RV.MetaPath = filepath.Join(cfg.WorkHome, "meta") + peerServerConfig.RV.DataExpireTime = time.Hour * 1 + peerServerConfig.Nodes = cfg.SuperNodes port, err := uploader.LaunchPeerServer(peerServerConfig) if err != nil { return err diff --git a/dfdaemon/transport/numerical.go b/dfdaemon/transport/numerical.go new file mode 100644 index 000000000..047ee7191 --- /dev/null +++ b/dfdaemon/transport/numerical.go @@ -0,0 +1,330 @@ +package transport + +import ( + "io" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +var( + // signal mode + nWare NumericalWare +) + +func init() { + nWare = NewNumericalWare() +} + +const( + TotalName = "total" + ScheduleName = "schedule" + RemoteIOName = "remote-io" + CSWWriteName = "csw-write" + RequestReadName = "request-read" +) + +type NumericalWare interface { + Add(key string, name string, data int64) + // reset num + Reset() + + Average() int64 + Sum() int64 + OutPut() []*MultiLevelSt + + OutputWithBaseLine(baseLine int64) *NumericalResult +} + +type MultiLevelSt struct { + ma map[string]int64 +} + +type numericalWare struct { + sync.Mutex + dataMap map[string]*MultiLevelSt + sum int64 + count int64 + cap int64 +} + +func NewNumericalWare() NumericalWare { + return &numericalWare{ + dataMap: make(map[string]*MultiLevelSt, 10000), + cap: 10000, + } +} + +func (n *numericalWare) Add(key string, name string, data int64) { + go n.add(key, name, data) +} + +func (n *numericalWare) add(key string, name string, data int64) { + n.Lock() + defer n.Unlock() + + var d *MultiLevelSt + + d, exist := n.dataMap[key] + if !exist { + d = &MultiLevelSt{ + ma: make(map[string]int64), + } + n.dataMap[key] = d + n.count ++ + } + + d.ma[name] = data + + if name == TotalName { + n.sum += data + } +} + +func (n *numericalWare) Reset() { + n.Lock() + defer n.Unlock() + + n.sum = 0 + n.dataMap = make(map[string]*MultiLevelSt, 10000) + n.count = 0 +} + +func (n *numericalWare) Average() int64 { + n.Lock() + defer n.Unlock() + + if n.count == 0 { + return 0 + } + return n.sum / n.count +} + +func (n *numericalWare) Sum() int64 { + n.Lock() + defer n.Unlock() + + return n.sum +} + +func (n *numericalWare) OutPut() []*MultiLevelSt { + n.Lock() + defer n.Unlock() + + copyData := make([]*MultiLevelSt, n.count) + i := 0 + for _, v := range n.dataMap { + copyData[i] = v + i ++ + } + + return copyData +} + +func (n *numericalWare) OutputWithBaseLine(baseLine int64) *NumericalResult { + var( + totalCount = 0 + ioCount = 0 + scheduleCount = 0 + cswWriteCount = 0 + requestReadCount = 0 + + totalSum int64 = 0 + ioSum int64 = 0 + scheduleSum int64 = 0 + cswWriteSum int64 = 0 + requestReadSum int64 = 0 + ) + + data := n.OutPut() + if len(data) == 0 { + return &NumericalResult{} + } + + resultData := make([][]int64, len(data)) + + for i, d := range data { + singleD := make([]int64, 5) + total := mapValue(d.ma, TotalName) + if total > 0 { + totalCount ++ + singleD[0] = total + totalSum += total + } + + rio := mapValue(d.ma, RemoteIOName) + if rio > 0 { + ioCount ++ + singleD[1] = rio + ioSum += rio + } + + sh := mapValue(d.ma, ScheduleName) + if sh > 0 { + scheduleCount ++ + singleD[2] = sh + scheduleSum += sh + } + + cw := mapValue(d.ma, CSWWriteName) + if cw > 0 { + cswWriteCount ++ + singleD[3] = cw + cswWriteSum += cw + } + + rr := mapValue(d.ma, RequestReadName) + if rr > 0 { + requestReadCount ++ + singleD[4] = rr + requestReadSum += rr + } + + resultData[i] = singleD + } + + ioAve := ioSum / int64(ioCount) + scheduleAve := scheduleSum / int64(scheduleCount) + cswWriteAve := cswWriteSum / int64(cswWriteCount) + requestReadAve := requestReadSum / int64(requestReadCount) + + rs := &NumericalResult{ + SumIO: ioSum, + AverageIO: ioAve, + SumSchedule: scheduleSum, + AverageSchedule: scheduleAve, + SumCwsWrite: cswWriteSum, + AverageCwsWrite: cswWriteAve, + SumRequestRead: requestReadSum, + AverageRequestRead: requestReadAve, + Data: resultData, + BaseLine: baseLine, + } + + rs.autoCompute() + return rs +} + +type numericalReader struct { + io.Reader + start time.Time + n NumericalWare + key string +} + +func NewNumericalReader(rd io.Reader, startTime time.Time, key string, n NumericalWare) *numericalReader { + return &numericalReader{ + Reader: rd, + start: startTime, + n: n, + key: key, + } +} + +func (r *numericalReader) Close() error { + logrus.Infof("close the reader") + cost := time.Now().Sub(r.start).Nanoseconds() + r.n.Add(r.key, TotalName, cost) + return nil +} + +type NumericalResult struct { + Sum int64 + Average int64 + SumIO int64 + AverageIO int64 + SumSchedule int64 + AverageSchedule int64 + SumCwsWrite int64 + AverageCwsWrite int64 + SumRequestRead int64 + AverageRequestRead int64 + + Data [][]int64 + + BaseLine int64 + // the sum sub the base data + BaseLineSum int64 + + BaseLineAverage int64 + + + Range1ms int64 + Range1To10ms int64 + Range10To50ms int64 + Range50To100ms int64 + Range100To500ms int64 + Range500To1000ms int64 + RangeOut1s int64 + + Err error +} + +func (r *NumericalResult) autoCompute() { + var sum int64 = 0 + count := len(r.Data) + for _, d := range r.Data { + r.filter(d[0]) + sum += d[0] + } + + if r.Sum == 0 { + r.Sum = sum + } + + if r.Average == 0 { + if count != 0 { + r.Average = r.Sum / int64(count) + } + } + + r.BaseLineSum = r.Sum - r.BaseLine * int64(count) + r.BaseLineAverage = r.Average - r.BaseLine +} + +func (r *NumericalResult) filter(data int64) { + const mNumber = 1000 * 1000 + dataWithBaseLine := data - r.BaseLine + + if dataWithBaseLine < mNumber { + r.Range1ms ++ + return + } + + if dataWithBaseLine < 10 * mNumber { + r.Range1To10ms ++ + return + } + + if dataWithBaseLine < 50 * mNumber { + r.Range10To50ms ++ + return + } + + if dataWithBaseLine < 100 * mNumber { + r.Range50To100ms ++ + return + } + + if dataWithBaseLine < 500 * mNumber { + r.Range100To500ms ++ + return + } + + if dataWithBaseLine < 1000 * mNumber { + r.Range500To1000ms ++ + return + } + + r.RangeOut1s ++ +} + +func mapValue(m map[string]int64, key string) int64 { + v, ok := m[key] + if ok { + return v + } + + return -1 +} diff --git a/dfdaemon/transport/transport.go b/dfdaemon/transport/transport.go index 3d7a635ef..66785245b 100644 --- a/dfdaemon/transport/transport.go +++ b/dfdaemon/transport/transport.go @@ -20,11 +20,17 @@ import ( "bytes" "context" "crypto/tls" + "encoding/json" + "fmt" + "github.com/dragonflyoss/Dragonfly/dfdaemon/config" + "github.com/dragonflyoss/Dragonfly/pkg/httputils" "io" "io/ioutil" "net" "net/http" "regexp" + "strconv" + "strings" "time" "github.com/pborman/uuid" @@ -49,6 +55,10 @@ type DFRoundTripper struct { ShouldUseDfget func(req *http.Request) bool Downloader downloader.Interface StreamDownloader downloader.Stream + ExtremeDownloader downloader.Stream + config *config.Properties + + nWare NumericalWare } // New returns the default DFRoundTripper. @@ -57,6 +67,7 @@ func New(opts ...Option) (*DFRoundTripper, error) { Round: defaultHTTPTransport(nil), Round2: http.NewFileTransport(http.Dir("/")), ShouldUseDfget: NeedUseGetter, + nWare: nWare, } for _, opt := range opts { @@ -115,6 +126,20 @@ func WithStreamDownloader(d downloader.Stream) Option { } } +func WithExtremeDownloader(d downloader.Stream) Option { + return func(rt *DFRoundTripper) error { + rt.ExtremeDownloader = d + return nil + } +} + +func WithConfig(cfg *config.Properties) Option { + return func(rt *DFRoundTripper) error { + rt.config = cfg + return nil + } +} + // WithCondition configures how to decide whether to use dfget or not. func WithCondition(c func(r *http.Request) bool) Option { return func(rt *DFRoundTripper) error { @@ -126,12 +151,14 @@ func WithCondition(c func(r *http.Request) bool) Option { // RoundTrip only process first redirect at present // fix resource release func (roundTripper *DFRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { - if req.Header.Get("x-nydus-proxy-healthcheck") != "" { - return &http.Response{ - StatusCode: 200, - // add a body with no data - Body: ioutil.NopCloser(bytes.NewReader([]byte{})), - }, nil + ret, resp, err := roundTripper.directReturn(req) + if ret { + return resp, err + } + + ret, resp, err = roundTripper.isNumericalResult(req) + if ret { + return resp, err } if roundTripper.ShouldUseDfget(req) { @@ -139,7 +166,10 @@ func (roundTripper *DFRoundTripper) RoundTrip(req *http.Request) (*http.Response // result for different requests req.Header.Del("Accept-Encoding") logrus.Debugf("round trip with dfget: %s", req.URL.String()) - if res, err := roundTripper.download(req, req.URL.String()); err == nil || !exception.IsNotAuth(err) { + + ctx := context.WithValue(req.Context(), "numericalWare", roundTripper.nWare) + ctx = context.WithValue(ctx, "key", uuid.New()) + if res, err := roundTripper.download(req.WithContext(ctx), req.URL.String()); err == nil || !exception.IsNotAuth(err) { return res, err } } @@ -158,20 +188,10 @@ func (roundTripper *DFRoundTripper) download(req *http.Request, urlString string return nil, err } - // fileReq, err := http.NewRequest("GET", "file:///"+dstPath, nil) - // if err != nil { - // return nil, err - // } - // - // response, err := roundTripper.Round2.RoundTrip(fileReq) - // if err == nil { - // response.Header.Set("Content-Disposition", "attachment; filename="+dstPath) - // } else { - // logrus.Errorf("read response from file:%s error:%v", dstPath, err) - // } resp := &http.Response{ StatusCode: 200, - Body: ioutil.NopCloser(reader), + //ContentLength: 1048576, + Body: NewNumericalReader(reader, time.Now(), req.Context().Value("key").(string), roundTripper.nWare), } return resp, nil // return response, err @@ -184,6 +204,12 @@ func (roundTripper *DFRoundTripper) downloadByGetter(ctx context.Context, url st } func (roundTripper *DFRoundTripper) downloadByStream(ctx context.Context, url string, header map[string][]string, name string) (io.Reader, error) { + taskID := httputils.GetTaskIDFromHeader(url, header, roundTripper.config.Extreme.SpecKeyOfTaskID) + if taskID != "" { + logrus.Infof("start download url in extreme mode: %s to %s in repo", url, name) + return roundTripper.ExtremeDownloader.DownloadStreamContext(ctx, url, header, name) + } + logrus.Infof("start download url:%s to %s in repo", url, name) return roundTripper.StreamDownloader.DownloadStreamContext(ctx, url, header, name) } @@ -193,3 +219,48 @@ func (roundTripper *DFRoundTripper) downloadByStream(ctx context.Context, url st func NeedUseGetter(req *http.Request) bool { return req.Method == http.MethodGet && layerReg.MatchString(req.URL.Path) } + +func (roundTripper *DFRoundTripper) directReturn(req *http.Request) (bool, *http.Response, error) { + if roundTripper.config.Extreme.SpecKeyOfDirectRet != "" && + req.Header.Get(roundTripper.config.Extreme.SpecKeyOfDirectRet) != ""{ + return true, &http.Response{ + StatusCode: 200, + // add a body with no data + Body: ioutil.NopCloser(bytes.NewReader([]byte{})), + }, nil + } + + return false, nil, nil +} + +func (roundTripper *DFRoundTripper) isNumericalResult(req *http.Request) (bool, *http.Response, error) { + if req.Header.Get("x-numerical-ware") == "" { + return false, nil, nil + } + + var baseLine int64 = 0 + var msgErr error + baseLineStr := req.Header.Get("x-numerical-ware-baseline") + if baseLineStr != "" { + bl, err := strconv.ParseInt(baseLineStr, 10, 64) + if err != nil { + msgErr = fmt.Errorf("base line parse failed: %v", err) + } + + baseLine = bl + } + + reset := req.Header.Get("x-numerical-ware-reset") + if strings.TrimSpace(reset) == "true" { + defer roundTripper.nWare.Reset() + } + + rs := roundTripper.nWare.OutputWithBaseLine(baseLine) + rs.Err = msgErr + rsData,_ := json.Marshal(rs) + + return true, &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewReader(rsData)), + }, nil +} diff --git a/dfget/config/constants.go b/dfget/config/constants.go index 3e6d53301..9f6c87952 100644 --- a/dfget/config/constants.go +++ b/dfget/config/constants.go @@ -72,11 +72,10 @@ const ( StrPieceSize = "pieceSize" StrDataDir = "dataDir" StrTotalLimit = "totalLimit" + StrOther = "other" StrBytes = "bytes" - - StrDigest = "X-dragonfly-digest" -) + ) /* piece meta */ const ( @@ -116,6 +115,7 @@ const ( LocalHTTPPathClient = "/client/" LocalHTTPPathRate = "/rate/" LocalHTTPPing = "/server/ping" + LocalHTTPFETCH = "/fetch" DataExpireTime = 3 * time.Minute ServerAliveTime = 5 * time.Minute diff --git a/dfget/core/api/supernode_api.go b/dfget/core/api/supernode_api.go index 6874ba84d..765b1bcb1 100644 --- a/dfget/core/api/supernode_api.go +++ b/dfget/core/api/supernode_api.go @@ -39,6 +39,7 @@ const ( peerServiceDownPath = "/peer/service/down" metricsReportPath = "/task/metrics" fetchP2PNetworkPath = "/peer/network" + peerHeartBeatPath = "/peer/heartbeat" ) // NewSupernodeAPI creates a new instance of SupernodeAPI with default value. @@ -61,6 +62,7 @@ type SupernodeAPI interface { FetchP2PNetworkInfo(node string, start int, limit int, req *types.FetchP2PNetworkInfoRequest) (resp *types.FetchP2PNetworkInfoResponse, e error) ReportResource(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, e error) ReportResourceDeleted(node string, taskID string, cid string) (resp *types.BaseResponse, e error) + HeartBeat(node string, req *api_types.HeartBeatRequest) (resp *types.BaseResponse, e error) } type supernodeAPI struct { @@ -135,7 +137,11 @@ func (api *supernodeAPI) ServiceDown(node string, taskID string, cid string) ( url := fmt.Sprintf("%s://%s%s?taskId=%s&cid=%s", api.Scheme, node, peerServiceDownPath, taskID, cid) + logrus.Infof("Call ServiceDown, node: %s, taskID: %s, cid: %s", node, taskID, cid) + resp = new(types.BaseResponse) + resp.Code = constants.CodeGetPeerDown + if e = api.get(url, resp); e != nil { logrus.Errorf("failed to send service down,err: %v", e) return nil, e @@ -242,6 +248,10 @@ func (api *supernodeAPI) FetchP2PNetworkInfo(node string, start int, limit int, if code, body, err = api.HTTPClient.PostJSON(url, req, api.Timeout); err != nil { return nil, err } + + logrus.Infof("in FetchP2PNetworkInfo, req url: %s, timeout: %v, body: %v", url, api.Timeout, req) + logrus.Infof("in FetchP2PNetworkInfo, resp code: %d, body: %s", code, string(body)) + if !httputils.HTTPStatusOk(code) { return nil, fmt.Errorf("%d:%s", code, body) } @@ -258,13 +268,17 @@ func (api *supernodeAPI) ReportResource(node string, req *types.RegisterRequest) body []byte ) url := fmt.Sprintf("%s://%s%s", - api.Scheme, node, peerPullPieceTaskPath) + api.Scheme, node, peerRegisterPath) header := map[string]string{ "X-report-resource": "true", } if code, body, err = api.HTTPClient.PostJSONWithHeaders(url, header, req, api.Timeout); err != nil { return nil, err } + + logrus.Infof("ReportResource, url: %s, header: %v, req: %v, " + + "code: %d, body: %s", url, header, req, code, string(body)) + if !httputils.HTTPStatusOk(code) { return nil, fmt.Errorf("%d:%s", code, body) } @@ -283,13 +297,36 @@ func (api *supernodeAPI) ReportResourceDeleted(node string, taskID string, cid s "X-report-resource": "true", } + logrus.Infof("Call ReportResourceDeleted, node: %s, taskID: %s, cid: %s, " + + "url: %s, header: %v", node, taskID, cid, url, header) + + resp = new(types.BaseResponse) + resp.Code = constants.Success + + if err = api.get(url, resp); err != nil { + logrus.Errorf("failed to send service down,err: %v", err) + return nil, err + } + if resp.Code != constants.CodeGetPeerDown { + logrus.Errorf("failed to send service down to supernode: api response code is %d not equal to %d", resp.Code, constants.CodeGetPeerDown) + } + + return +} + +func (api *supernodeAPI) HeartBeat(node string, req *api_types.HeartBeatRequest) (resp *types.BaseResponse, err error) { + url := fmt.Sprintf("%s://%s%s?ip=%s&port=%dcid=%s", + api.Scheme, node, peerHeartBeatPath, req.IP, req.Port, req.CID) + resp = new(types.BaseResponse) - if err = api.getWithHeaders(url, header, resp); err != nil { - logrus.Errorf("failed to send resource delete,err: %v", err) + if err = api.get(url, resp); err != nil { + logrus.Errorf("failed to send service down,err: %v", err) return nil, err } + if resp.Code != constants.Success { - logrus.Errorf("failed to send send resource delete to supernode: api response code is %d not equal to %d", resp.Code, constants.Success) + logrus.Errorf("failed to send heart beat to supernode: api response code is %d not equal to %d", resp.Code, constants.Success) } + return } diff --git a/dfget/core/api/uploader_api.go b/dfget/core/api/uploader_api.go index 77dc3b12c..17f473a44 100644 --- a/dfget/core/api/uploader_api.go +++ b/dfget/core/api/uploader_api.go @@ -17,13 +17,18 @@ package api import ( + "encoding/base64" + "encoding/json" "fmt" + "github.com/dragonflyoss/Dragonfly/dfget/types" "net/http" "strconv" "time" "github.com/dragonflyoss/Dragonfly/dfget/config" "github.com/dragonflyoss/Dragonfly/pkg/httputils" + + "github.com/sirupsen/logrus" ) // UploaderAPI defines the communication methods between dfget and uploader. @@ -40,6 +45,9 @@ type UploaderAPI interface { // PingServer send a request to determine whether the server has started. PingServer(ip string, port int) bool + + // FetchLocalTask fetch the local resource + FetchLocalTask(ip string, port int) (*types.FetchLocalTaskInfo, error) } // uploaderAPI is an implementation of interface UploaderAPI. @@ -74,13 +82,19 @@ func (u *uploaderAPI) CheckServer(ip string, port int, req *CheckServerRequest) } func (u *uploaderAPI) FinishTask(ip string, port int, req *FinishTaskRequest) error { + otherData, err := json.Marshal(req.Other) + if err != nil { + return err + } + url := fmt.Sprintf("http://%s:%d%sfinish?"+ config.StrTaskFileName+"=%s&"+ config.StrTaskID+"=%s&"+ config.StrClientID+"=%s&"+ - config.StrSuperNode+"=%s", + config.StrSuperNode+"=%s&"+ + config.StrOther+"=%s", ip, port, config.LocalHTTPPathClient, - req.TaskFileName, req.TaskID, req.ClientID, req.Node) + req.TaskFileName, req.TaskID, req.ClientID, req.Node, base64.StdEncoding.EncodeToString(otherData)) code, body, err := httputils.Get(url, u.timeout) if code == http.StatusOK { @@ -94,6 +108,23 @@ func (u *uploaderAPI) FinishTask(ip string, port int, req *FinishTaskRequest) er func (u *uploaderAPI) PingServer(ip string, port int) bool { url := fmt.Sprintf("http://%s:%d%s", ip, port, config.LocalHTTPPing) + logrus.Infof("ping server url: %s", url) code, _, _ := httputils.Get(url, u.timeout) return code == http.StatusOK } + +func (u *uploaderAPI) FetchLocalTask(ip string, port int) (*types.FetchLocalTaskInfo, error) { + url := fmt.Sprintf("http://%s:%d%s", ip, port, config.LocalHTTPFETCH) + logrus.Infof("fetch server url: %s", url) + code, body, _ := httputils.Get(url, u.timeout) + if code != http.StatusOK { + return nil, fmt.Errorf("get code: %d, resp: %v", code, body) + } + + result := &types.FetchLocalTaskInfo{} + if err := json.Unmarshal(body, result); err != nil { + return nil, fmt.Errorf("failed to decode %v: %v", body, err) + } + + return result, nil +} diff --git a/dfget/core/api/uploader_api_types.go b/dfget/core/api/uploader_api_types.go index 50433d2df..a431a36c3 100644 --- a/dfget/core/api/uploader_api_types.go +++ b/dfget/core/api/uploader_api_types.go @@ -38,4 +38,13 @@ type FinishTaskRequest struct { TaskID string `request:"taskID"` ClientID string `request:"cid"` Node string `request:"superNode"` + Other FinishTaskOther `request:"other"` +} + +type FinishTaskOther struct { + RawURL string `json:"rawURL"` + TaskURL string `json:"taskURL"` + FileLength int64 `json:"fileLength"` + Headers []string `json:"headers"` + SpecReport bool `json:"specReport"` } diff --git a/dfget/core/core.go b/dfget/core/core.go index 89426035f..7923b596b 100644 --- a/dfget/core/core.go +++ b/dfget/core/core.go @@ -176,6 +176,11 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) if e.Code == constants.CodeNeedAuth { return nil, e } + + if e.Code == constants.CodeNOURL || e.Code == constants.CodeReturnSrc { + return nil, e + } + cfg.BackSourceReason = config.BackSourceReasonRegisterFail panic(e.Error()) } diff --git a/dfget/core/dfget.go b/dfget/core/dfget.go index 414b66702..256a07c57 100644 --- a/dfget/core/dfget.go +++ b/dfget/core/dfget.go @@ -65,6 +65,10 @@ func (df *dfGet) GetReader(ctx context.Context, cfg *config.Config) (io.Reader, } if result, err = registerToSuperNode(cfg, register); err != nil { + if _,ok := err.(*errortypes.DfError); ok { + return nil, err + } + return nil, errortypes.New(config.CodeRegisterError, err.Error()) } diff --git a/dfget/core/downloader/p2p_downloader/client_stream_writer.go b/dfget/core/downloader/p2p_downloader/client_stream_writer.go index 963233e6d..c24cd0253 100644 --- a/dfget/core/downloader/p2p_downloader/client_stream_writer.go +++ b/dfget/core/downloader/p2p_downloader/client_stream_writer.go @@ -19,6 +19,7 @@ package downloader import ( "context" "fmt" + "github.com/dragonflyoss/Dragonfly/dfdaemon/transport" "io" "time" @@ -62,11 +63,21 @@ type ClientStreamWriter struct { // api holds an instance of SupernodeAPI to interact with supernode. api api.SupernodeAPI cfg *config.Config + + noReport bool + expectReadSize int64 + alreadyReadSize int64 + ctx context.Context + nWare transport.NumericalWare + nKey string + + startTime time.Time } // NewClientStreamWriter creates and initialize a ClientStreamWriter instance. -func NewClientStreamWriter(clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter { +func NewClientStreamWriter(ctx context.Context, clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config, noReport bool, expectReadSize int64) *ClientStreamWriter { pr, pw := io.Pipe() + logrus.Infof("rate limit: %v, expect read size: %d", cfg.LocalLimit, expectReadSize) limitReader := limitreader.NewLimitReader(pr, int64(cfg.LocalLimit), cfg.Md5 != "") clientWriter := &ClientStreamWriter{ clientQueue: clientQueue, @@ -76,7 +87,21 @@ func NewClientStreamWriter(clientQueue queue.Queue, api api.SupernodeAPI, cfg *c api: api, cfg: cfg, cache: make(map[int]*Piece), + noReport: noReport, + expectReadSize: expectReadSize, + ctx: ctx, + } + + nWareObj, ok := ctx.Value("numericalWare").(transport.NumericalWare) + if ok { + clientWriter.nWare = nWareObj } + + nKey, ok := ctx.Value("key").(string) + if ok { + clientWriter.nKey = nKey + } + return clientWriter } @@ -95,6 +120,7 @@ func (csw *ClientStreamWriter) PostRun(ctx context.Context) (err error) { func (csw *ClientStreamWriter) Run(ctx context.Context) { for { item := csw.clientQueue.Poll() + logrus.Infof("in ClientStreamWriter Run, poll item: %v", item) state, ok := item.(string) if ok && state == last { break @@ -120,6 +146,7 @@ func (csw *ClientStreamWriter) Run(ctx context.Context) { } } + logrus.Infof("success to return data to request") csw.pipeWriter.Close() close(csw.finish) } @@ -136,13 +163,16 @@ func (csw *ClientStreamWriter) write(piece *Piece) error { // TODO csw.p2pPattern err := csw.writePieceToPipe(piece) - if err == nil { + if err == nil && !csw.noReport { go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime)) } return err } func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error { + var( + startTime time.Time = time.Now() + ) for { // must write piece by order // when received PieceNum is greater then pieceIndex, cache it @@ -156,7 +186,12 @@ func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error { break } - _, err := io.Copy(csw.pipeWriter, p.RawContent()) + bufReader := p.RawContent() + if csw.nWare != nil { + csw.nWare.Add(csw.nKey, transport.CSWWriteName, time.Since(startTime).Nanoseconds()) + } + csw.startTime = time.Now() + _, err := io.Copy(csw.pipeWriter, bufReader) if err != nil { return err } @@ -177,6 +212,14 @@ func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error { func (csw *ClientStreamWriter) Read(p []byte) (n int, err error) { n, err = csw.limitReader.Read(p) + csw.alreadyReadSize += int64(n) + if csw.expectReadSize > 0 && csw.alreadyReadSize >= csw.expectReadSize { + go csw.notifyCloseStream() + if csw.nWare != nil { + csw.nWare.Add(csw.nKey, transport.RequestReadName, time.Since(csw.startTime).Nanoseconds()) + } + } + // all data received, calculate md5 if err == io.EOF && csw.cfg.Md5 != "" { realMd5 := csw.limitReader.Md5() @@ -186,3 +229,8 @@ func (csw *ClientStreamWriter) Read(p []byte) (n int, err error) { } return n, err } + +func (csw *ClientStreamWriter) notifyCloseStream() { + // notify the main loop to break + csw.clientQueue.Put("last") +} diff --git a/dfget/core/downloader/p2p_downloader/client_stream_writer_test.go b/dfget/core/downloader/p2p_downloader/client_stream_writer_test.go index 0a79e58ee..58682225a 100644 --- a/dfget/core/downloader/p2p_downloader/client_stream_writer_test.go +++ b/dfget/core/downloader/p2p_downloader/client_stream_writer_test.go @@ -18,6 +18,7 @@ package downloader import ( "bytes" + "context" "io" "sort" @@ -93,7 +94,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) { copy(cases2, cases) cfg := &config.Config{} - csw := NewClientStreamWriter(nil, nil, cfg) + csw := NewClientStreamWriter(context.Background(),nil, nil, cfg, false, 0) go func() { for _, v := range cases2 { err := csw.writePieceToPipe(v.piece) diff --git a/dfget/core/downloader/p2p_downloader/p2p_downloader.go b/dfget/core/downloader/p2p_downloader/p2p_downloader.go index f02bcb634..bb80a8e29 100644 --- a/dfget/core/downloader/p2p_downloader/p2p_downloader.go +++ b/dfget/core/downloader/p2p_downloader/p2p_downloader.go @@ -166,7 +166,7 @@ func (p2p *P2PDownloader) RunStream(ctx context.Context) (io.Reader, error) { if !p2p.streamMode { return nil, fmt.Errorf("streamMode disable, should be enabled") } - clientStreamWriter := NewClientStreamWriter(p2p.clientQueue, p2p.API, p2p.cfg) + clientStreamWriter := NewClientStreamWriter(ctx, p2p.clientQueue, p2p.API, p2p.cfg, false, 0) go func() { err := p2p.run(ctx, clientStreamWriter) if err != nil { diff --git a/dfget/core/downloader/p2p_downloader/power_client.go b/dfget/core/downloader/p2p_downloader/power_client.go index 3ffffd0fe..fde48c86e 100644 --- a/dfget/core/downloader/p2p_downloader/power_client.go +++ b/dfget/core/downloader/p2p_downloader/power_client.go @@ -18,7 +18,10 @@ package downloader import ( "bytes" + "crypto/md5" + "encoding/binary" "fmt" + "hash" "io" "net/http" "strings" @@ -76,6 +79,43 @@ type PowerClient struct { clientError *types.ClientErrorRequest } +type PowerClientConfig struct { + // taskID is a string which represents a unique task. + TaskID string + // node indicates the IP address of the currently registered supernode. + Node string + // pieceTask is the data when successfully pulling piece task + // and the task is continuing. + PieceTask *types.PullPieceTaskResponseContinueData + + Cfg *config.Config + // queue maintains a queue of tasks that to be downloaded. + // When the download fails, the piece is requeued. + Queue queue.Queue + // clientQueue maintains a queue of tasks that need to be written to disk. + // A piece will be putted into this queue after it be downloaded successfully. + ClientQueue queue.Queue + + // rateLimiter limits the download speed. + RateLimiter *ratelimiter.RateLimiter + + // downloadAPI holds an instance of DownloadAPI. + DownloadAPI api.DownloadAPI +} + +func NewPowerClient(config *PowerClientConfig) *PowerClient { + return &PowerClient{ + taskID: config.TaskID, + node: config.Node, + pieceTask: config.PieceTask, + cfg: config.Cfg, + queue: config.Queue, + clientQueue: config.ClientQueue, + rateLimiter: config.RateLimiter, + downloadAPI: config.DownloadAPI, + } +} + // Run starts run the task. func (pc *PowerClient) Run() error { startTime := time.Now() @@ -108,13 +148,19 @@ func (pc *PowerClient) ClientError() *types.ClientErrorRequest { } func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) { + var( + resp *http.Response + err error + md5HashReader hash.Hash + ) + pieceMetaArr := strings.Split(pc.pieceTask.PieceMd5, ":") pieceMD5 := pieceMetaArr[0] dstIP := pc.pieceTask.PeerIP peerPort := pc.pieceTask.PeerPort // check that the target download peer is available - if dstIP != pc.node { + if !pc.pieceTask.DirectSource && dstIP != pc.node { if _, e = httputils.CheckConnect(dstIP, peerPort, -1); e != nil { return nil, e } @@ -123,7 +169,20 @@ func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) { // send download request startTime := time.Now() timeout := netutils.CalculateTimeout(int64(pc.pieceTask.PieceSize), pc.cfg.MinRate, config.DefaultMinRate, 10*time.Second) - resp, err := pc.downloadAPI.Download(dstIP, peerPort, pc.createDownloadRequest(), timeout) + if pc.pieceTask.DirectSource { + header := map[string]string{} + h := http.Header(pc.pieceTask.Header) + for k, _ := range pc.pieceTask.Header { + header[k] = h.Get(k) + } + resp, err = httputils.HTTPGetTimeout(pc.pieceTask.Url, header, timeout) + logrus.Debugf("in downloadPiece by returnSrc, url: %s, header: %v, err: %d", pc.pieceTask.Url, header, err) + }else{ + createReq := pc.createDownloadRequest() + resp, err = pc.downloadAPI.Download(dstIP, peerPort, createReq, timeout) + logrus.Debugf("in downloadPiece by p2p, dstIP: %s, peerPort: %d, req: %v, err: %v", dstIP, peerPort, pc.createDownloadRequest(), err) + } + if err != nil { return nil, err } @@ -139,15 +198,39 @@ func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) { return nil, errortypes.New(resp.StatusCode, pc.readBody(resp.Body)) } + content = &bytes.Buffer{} + if pieceMD5 != "" { + md5HashReader = md5.New() + } + + if pc.pieceTask.DirectSource { + // add pad if download from src url + //padSize := config.PieceMetaSize + pieceSize := pc.pieceTask.PieceSize + buf := make([]byte, 128) + binary.BigEndian.PutUint32(buf, uint32((pieceSize)|(pieceSize)<<4)) + content.Write(buf[:config.PieceHeadSize]) + if md5HashReader != nil { + md5HashReader.Write(buf[:config.PieceHeadSize]) + } + } + // start to read data from resp // use limitReader to limit the download speed - limitReader := limitreader.NewLimitReaderWithLimiter(pc.rateLimiter, resp.Body, pieceMD5 != "") - content = &bytes.Buffer{} + limitReader := limitreader.NewLimitReaderWithLimiterAndMD5Sum(resp.Body, pc.rateLimiter, md5HashReader) + if pc.total, e = content.ReadFrom(limitReader); e != nil { return nil, e } pc.readCost = time.Since(startTime) + if pc.pieceTask.DirectSource { + if md5HashReader != nil { + md5HashReader.Write([]byte{config.PieceTailChar}) + } + content.Write([]byte{config.PieceTailChar}) + } + // Verify md5 code if realMd5 := limitReader.Md5(); realMd5 != pieceMD5 { pc.initFileMd5NotMatchError(dstIP, realMd5, pieceMD5) @@ -217,3 +300,7 @@ func (pc *PowerClient) readBody(body io.ReadCloser) string { } return strings.TrimSpace(buf.String()) } + +func (pc *PowerClient) CostReadTime() time.Duration { + return pc.readCost +} diff --git a/dfget/core/helper/test_helper.go b/dfget/core/helper/test_helper.go index 3ade007d4..a469cbde1 100644 --- a/dfget/core/helper/test_helper.go +++ b/dfget/core/helper/test_helper.go @@ -17,9 +17,7 @@ package helper import ( - "encoding/json" "fmt" - "github.com/dragonflyoss/Dragonfly/pkg/httputils" "io" "io/ioutil" "math/rand" @@ -191,6 +189,10 @@ func (api *MockSupernodeAPI) ReportResourceDeleted(node string, taskID string, c return nil, nil } +func (api *MockSupernodeAPI) HeartBeat(node string, req *api_types.HeartBeatRequest) (resp *types.BaseResponse, e error) { + return nil, nil +} + // CreateRegisterFunc creates a mock register function. func CreateRegisterFunc() RegisterFuncType { var newResponse = func(code int, msg string) *types.RegisterResponse { diff --git a/dfget/core/regist/register.go b/dfget/core/regist/register.go index 0176415cd..71aeb7025 100644 --- a/dfget/core/regist/register.go +++ b/dfget/core/regist/register.go @@ -80,8 +80,10 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes logrus.Errorf("register to node:%s error:%v", nodes[i], e) continue } + if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth || - resp.Code == constants.CodeURLNotReachable { + resp.Code == constants.CodeURLNotReachable || resp.Code == constants.CodeNOURL || + resp.Code == constants.CodeReturnSrc { break } if resp.Code == constants.CodeWaitAuth && retryTimes < 3 { diff --git a/dfget/core/uploader/cache.go b/dfget/core/uploader/cache.go new file mode 100644 index 000000000..78c0bdb5e --- /dev/null +++ b/dfget/core/uploader/cache.go @@ -0,0 +1,17 @@ +package uploader + +import "bytes" + +type cacheBuffer struct { + // is cache valid + size int64 + valid bool + buf *bytes.Buffer +} + +func newCacheBuffer() *cacheBuffer { + return &cacheBuffer{ + valid: false, + buf: nil, + } +} \ No newline at end of file diff --git a/dfget/core/uploader/peer_server.go b/dfget/core/uploader/peer_server.go index bdc451637..a4783e691 100644 --- a/dfget/core/uploader/peer_server.go +++ b/dfget/core/uploader/peer_server.go @@ -17,14 +17,18 @@ package uploader import ( + "bytes" "context" + "encoding/base64" "encoding/binary" "encoding/json" "fmt" "io" + "io/ioutil" "net" "net/http" "os" + "path/filepath" "strconv" "strings" "sync" @@ -37,6 +41,9 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/limitreader" "github.com/dragonflyoss/Dragonfly/pkg/ratelimiter" "github.com/dragonflyoss/Dragonfly/version" + apitypes "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/pkg/constants" + "github.com/dragonflyoss/Dragonfly/dfget/types" "github.com/gorilla/mux" "github.com/sirupsen/logrus" @@ -52,6 +59,14 @@ func newPeerServer(cfg *config.Config, port int) *peerServer { api: api.NewSupernodeAPI(), } + s.syncTaskContainer = &taskContainer{ + syncTaskMap: sync.Map{}, + ps: s, + } + + logrus.Infof("peer server config: %v, supernodes: %v", cfg, cfg.Nodes) + + s.reload() r := s.initRouter() s.Server = &http.Server{ Addr: net.JoinHostPort(s.host, strconv.Itoa(port)), @@ -68,7 +83,7 @@ func newPeerServer(cfg *config.Config, port int) *peerServer { type peerServer struct { cfg *config.Config - // finished indicates whether the peer server is shutdown + // Finished indicates whether the peer server is shutdown finished chan struct{} // server related fields @@ -82,19 +97,69 @@ type peerServer struct { // totalLimitRate is the total network bandwidth shared by tasks on the same host totalLimitRate int - // syncTaskMap stores the meta name of tasks on the host + // syncTaskContainer stores the meta name of tasks on the host + syncTaskContainer *taskContainer +} + +type taskContainer struct { syncTaskMap sync.Map + ps *peerServer +} + +func (t *taskContainer) Store(taskFileName string, tc *taskConfig) { + t.syncTaskMap.Store(taskFileName, tc) + t.syncToLocalFs(taskFileName, tc) +} + +func (t *taskContainer) Load(taskFileName string) (interface{}, bool) { + return t.syncTaskMap.Load(taskFileName) +} + +func (t *taskContainer) Range(f func(key, value interface{}) bool) { + t.syncTaskMap.Range(f) +} + +func (t *taskContainer) Delete(taskFileName string) { + t.syncTaskMap.Delete(taskFileName) + fp := t.ps.taskMetaFilePath(taskFileName) + fpBak := t.ps.taskMetaFileBakPath(taskFileName) + + os.Remove(fp) + os.Remove(fpBak) +} + +func (t *taskContainer) syncToLocalFs(key string, tc *taskConfig) { + fpBak := t.ps.taskMetaFileBakPath(key) + data, err := json.Marshal(tc) + if err != nil { + logrus.Warnf("failed to sync task %s to local fs: %v", key, err) + return + } + + err = ioutil.WriteFile(fpBak, data, 0664) + if err != nil { + logrus.Warnf("failed to sync task %s to local fs: %v", key, err) + return + } + + fp := t.ps.taskMetaFilePath(key) + err = os.Rename(fpBak, fp) + if err != nil { + logrus.Warnf("failed to sync task %s to local fs: %v", key, err) + } } // taskConfig refers to some name about peer task. type taskConfig struct { - taskID string - rateLimit int - cid string - dataDir string - superNode string - finished bool - accessTime time.Time + TaskID string `json:"taskID"` + RateLimit int `json:"rateLimit"` + Cid string `json:"cid"` + DataDir string `json:"dataDir"` + SuperNode string `json:"superNode"` + Finished bool `json:"finished"` + AccessTime time.Time `json:"accessTime"` + Other *api.FinishTaskOther `json:"other"` + cache *cacheBuffer `json:"-"` } // uploadParam refers to all params needed in the handler of upload. @@ -107,8 +172,148 @@ type uploadParam struct { pieceNum int64 } +const( + taskMetaDir = "task-meta" +) + +// reload the resource which read from local file system +func (ps *peerServer) reload() { + var( + localTaskConfig = map[string]*taskConfig{} + err error + ) + + taskMetaPath := filepath.Join(ps.cfg.RV.MetaPath, taskMetaDir) + err = os.MkdirAll(taskMetaPath, 0744) + if err != nil { + panic(fmt.Sprintf("failed to init dir %s", taskMetaPath)) + } + + localTaskConfig, err = ps.readTaskInfoFromDir(filepath.Join(ps.cfg.RV.MetaPath, taskMetaDir)) + if err != nil { + logrus.Warnf("failed to read task from local: %v", err) + return + } + + logrus.Infof("try to reload task file") + ps.initLocalTask(localTaskConfig) + ps.registerTaskToSuperNode() +} + +func (ps *peerServer) registerTaskToSuperNode() { + ps.syncTaskContainer.Range(func(key, value interface{}) bool { + taskFileName := key.(string) + tc := value.(*taskConfig) + ps.reportResource(taskFileName, tc) + return true + }) +} + +func (ps *peerServer) reportResource(taskFileName string, tc *taskConfig) { + req := &types.RegisterRequest{ + RawURL: tc.Other.RawURL, + TaskURL: tc.Other.TaskURL, + Cid: ps.cfg.RV.Cid, + IP: ps.cfg.RV.LocalIP, + Port: ps.cfg.RV.PeerPort, + Path: taskFileName, + Md5: ps.cfg.Md5, + Identifier: ps.cfg.Identifier, + Headers: tc.Other.Headers, + Dfdaemon: ps.cfg.DFDaemon, + Insecure: ps.cfg.Insecure, + TaskId: tc.TaskID, + FileLength: tc.Other.FileLength, + } + + for _,node := range ps.cfg.Nodes { + resp, err := ps.api.ReportResource(node, req) + if err == nil && resp.Code == constants.Success { + logrus.Infof("success to report resource %v to supernode", req) + break + }else{ + logrus.Errorf("failed to report resource %v tp supernode, resp: %v, err: %v", req, resp, err) + } + } +} + +func (ps *peerServer) readTaskInfoFromDir(dir string) (map[string]*taskConfig, error) { + result := map[string]*taskConfig{} + + fin, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + + for _, fi := range fin { + taskFileName := "" + isBak := false + + if strings.HasSuffix(fi.Name(), ".meta") { + taskFileName = strings.TrimSuffix(fi.Name(), ".meta") + }else if strings.HasSuffix(fi.Name(), ".meta.bak") { + taskFileName = strings.TrimSuffix(fi.Name(), ".meta.bak") + isBak = true + }else{ + continue + } + + fp := filepath.Join(dir, fi.Name()) + meta, err := ioutil.ReadFile(fp) + if err != nil { + logrus.Errorf("failed to read meta data of task %s: %v", fi.Name(), err) + continue + } + + tc := &taskConfig{} + + err = json.Unmarshal(meta, tc) + if err != nil { + logrus.Errorf("failed to read meta data of task %s: %v", fi.Name(), err) + continue + } + + if !isBak { + result[taskFileName] = tc + }else{ + if _, exist := result[taskFileName]; !exist { + result[taskFileName] = tc + } + } + } + + return result, nil +} + +func (ps *peerServer) initLocalTask(m map[string]*taskConfig) { + for taskFileName, tc := range m { + tc.AccessTime = time.Now() + + taskPath := helper.GetServiceFile(taskFileName, tc.DataDir) + + _, err := os.Stat(taskPath) + if err != nil { + logrus.Warnf("failed to stat taskFile %s for task %s: %v", taskPath, tc.TaskID, err) + continue + } + + tc.cache = newCacheBuffer() + ps.syncTaskContainer.Store(taskFileName, tc) + ps.syncCache(taskFileName) + // todo: notify the supernode to register task + } +} + +func (ps *peerServer) taskMetaFilePath(taskFileName string) string { + return filepath.Join(ps.cfg.RV.MetaPath, taskMetaDir, fmt.Sprintf("%s.meta", taskFileName)) +} + +func (ps *peerServer) taskMetaFileBakPath(taskFileName string) string { + return filepath.Join(ps.cfg.RV.MetaPath, taskMetaDir, fmt.Sprintf("%s.meta.bak", taskFileName)) +} + // ---------------------------------------------------------------------------- -// init method of peerServer +// reload method of peerServer func (ps *peerServer) initRouter() *mux.Router { r := mux.NewRouter() @@ -117,7 +322,7 @@ func (ps *peerServer) initRouter() *mux.Router { r.HandleFunc(config.LocalHTTPPathCheck+"{commonFile:.*}", ps.checkHandler).Methods("GET") r.HandleFunc(config.LocalHTTPPathClient+"finish", ps.oneFinishHandler).Methods("GET") r.HandleFunc(config.LocalHTTPPing, ps.pingHandler).Methods("GET") - + r.HandleFunc(config.LocalHTTPFETCH, ps.fetchHandler).Methods("GET") return r } @@ -148,6 +353,34 @@ func (ps *peerServer) uploadHandler(w http.ResponseWriter, r *http.Request) { return } + // get task config + v, ok := ps.syncTaskContainer.Load(taskFileName) + if !ok { + rangeErrorResponse(w, fmt.Errorf("failed to get taskPath: %s", taskFileName)) + logrus.Errorf("failed to open file:%s, %v", taskFileName, err) + return + } + + task := v.(*taskConfig) + // sync access time + task.AccessTime = time.Now() + + if task.cache.valid { + // try to upload from cache + size = task.cache.size + if err = amendRange(size, true, up); err != nil { + rangeErrorResponse(w, err) + logrus.Errorf("failed to amend range of file %s: %v", taskFileName, err) + return + } + + if err := ps.uploadPieceFromCache(task.cache, w, up); err != nil { + logrus.Errorf("failed to send range(%s) of file(%s): %v", rangeStr, taskFileName, err) + } + + return + } + // Step2: get task file if f, size, err = ps.getTaskFile(taskFileName); err != nil { rangeErrorResponse(w, err) @@ -169,6 +402,32 @@ func (ps *peerServer) uploadHandler(w http.ResponseWriter, r *http.Request) { } } +func (ps *peerServer) uploadPieceFromCache(c *cacheBuffer, w http.ResponseWriter, up *uploadParam) (e error) { + w.Header().Set(config.StrContentLength, strconv.FormatInt(up.length, 10)) + sendHeader(w, http.StatusPartialContent) + + readLen := up.length - up.padSize + buf := make([]byte, 256*1024) + + if up.padSize > 0 { + binary.BigEndian.PutUint32(buf, uint32((readLen)|(up.pieceSize)<<4)) + w.Write(buf[:config.PieceHeadSize]) + defer w.Write([]byte{config.PieceTailChar}) + } + + brd := bytes.NewReader(c.buf.Bytes()) + brd.Seek(up.start, 0) + r := io.LimitReader(brd, readLen) + if ps.rateLimiter != nil { + lr := limitreader.NewLimitReaderWithLimiter(ps.rateLimiter, r, false) + _, e = io.CopyBuffer(w, lr, buf) + } else { + _, e = io.CopyBuffer(w, r, buf) + } + + return +} + func (ps *peerServer) parseRateHandler(w http.ResponseWriter, r *http.Request) { sendAlive(ps.cfg) @@ -179,15 +438,15 @@ func (ps *peerServer) parseRateHandler(w http.ResponseWriter, r *http.Request) { if err != nil { w.WriteHeader(http.StatusBadRequest) fmt.Fprint(w, err.Error()) - logrus.Errorf("failed to convert rateLimit %v, %v", rateLimit, err) + logrus.Errorf("failed to convert RateLimit %v, %v", rateLimit, err) return } sendSuccess(w) - // update the rateLimit of commonFile - if v, ok := ps.syncTaskMap.Load(taskFileName); ok { + // update the RateLimit of commonFile + if v, ok := ps.syncTaskContainer.Load(taskFileName); ok { param := v.(*taskConfig) - param.rateLimit = clientRate + param.RateLimit = clientRate } // no need to calculate rate when totalLimitRate less than or equals zero. @@ -224,9 +483,9 @@ func (ps *peerServer) checkHandler(w http.ResponseWriter, r *http.Request) { dataDir := r.Header.Get(config.StrDataDir) param := &taskConfig{ - dataDir: dataDir, + DataDir: dataDir, } - ps.syncTaskMap.Store(taskFileName, param) + ps.syncTaskContainer.Store(taskFileName, param) fmt.Fprintf(w, "%s@%s", taskFileName, version.DFGetVersion) } @@ -242,34 +501,84 @@ func (ps *peerServer) oneFinishHandler(w http.ResponseWriter, r *http.Request) { taskID := r.FormValue(config.StrTaskID) cid := r.FormValue(config.StrClientID) superNode := r.FormValue(config.StrSuperNode) + other := r.FormValue(config.StrOther) + otherData, err := base64.StdEncoding.DecodeString(other) + if err != nil { + sendHeader(w, http.StatusBadRequest) + fmt.Fprintf(w, "invalid params, failed to decode other data: %v", err) + return + } + + otherSt := api.FinishTaskOther{} + err = json.Unmarshal(otherData, &otherSt) + if err != nil { + sendHeader(w, http.StatusBadRequest) + fmt.Fprintf(w, "invalid params, failed to decode other data: %v", err) + return + } + if taskFileName == "" || taskID == "" || cid == "" { sendHeader(w, http.StatusBadRequest) fmt.Fprintf(w, "invalid params") return } - if v, ok := ps.syncTaskMap.Load(taskFileName); ok { + if v, ok := ps.syncTaskContainer.Load(taskFileName); ok { task := v.(*taskConfig) - task.taskID = taskID - task.rateLimit = 0 - task.cid = cid - task.superNode = superNode - task.finished = true - task.accessTime = time.Now() + task.TaskID = taskID + task.RateLimit = 0 + task.Cid = cid + task.SuperNode = superNode + task.Finished = true + task.AccessTime = time.Now() + task.cache = newCacheBuffer() + task.Other = &otherSt } else { - ps.syncTaskMap.Store(taskFileName, &taskConfig{ - taskID: taskID, - cid: cid, - dataDir: ps.cfg.RV.SystemDataDir, - superNode: superNode, - finished: true, - accessTime: time.Now(), + ps.syncTaskContainer.Store(taskFileName, &taskConfig{ + TaskID: taskID, + Cid: cid, + DataDir: ps.cfg.RV.SystemDataDir, + SuperNode: superNode, + Finished: true, + AccessTime: time.Now(), + cache: newCacheBuffer(), + Other: &otherSt, }) } + go ps.syncCache(taskFileName) sendSuccess(w) fmt.Fprintf(w, "success") } +func (ps *peerServer) syncCache(taskFileName string) { + v, ok := ps.syncTaskContainer.Load(taskFileName) + if !ok { + return + } + + task := v.(*taskConfig) + f,size, err := ps.getTaskFile(taskFileName) + if err != nil { + logrus.Errorf("in syncCache %s, failed to getTaskFile: %v", taskFileName, err) + return + } + + defer f.Close() + + buf := &bytes.Buffer{} + copySize, _ := io.CopyN(buf, f, size) + if copySize != size { + logrus.Errorf("failed to syncCache %s from file %s, expected size %d, but got %d", + taskFileName, f.Name(), size, copySize) + return + } + task.cache.buf = buf + task.cache.size = size + task.cache.valid = true + + logrus.Infof("success to sync cache %s", taskFileName) +} + func (ps *peerServer) pingHandler(w http.ResponseWriter, r *http.Request) { sendSuccess(w) fmt.Fprintf(w, "success") @@ -282,7 +591,7 @@ func (ps *peerServer) pingHandler(w http.ResponseWriter, r *http.Request) { func (ps *peerServer) getTaskFile(taskFileName string) (*os.File, int64, error) { errSize := int64(-1) - v, ok := ps.syncTaskMap.Load(taskFileName) + v, ok := ps.syncTaskContainer.Load(taskFileName) if !ok { return nil, errSize, fmt.Errorf("failed to get taskPath: %s", taskFileName) } @@ -291,10 +600,10 @@ func (ps *peerServer) getTaskFile(taskFileName string) (*os.File, int64, error) return nil, errSize, fmt.Errorf("failed to assert: %s", taskFileName) } - // update the accessTime of taskFileName - tc.accessTime = time.Now() + // update the AccessTime of taskFileName + tc.AccessTime = time.Now() - taskPath := helper.GetServiceFile(taskFileName, tc.dataDir) + taskPath := helper.GetServiceFile(taskFileName, tc.DataDir) fileInfo, err := os.Stat(taskPath) if err != nil { @@ -407,13 +716,13 @@ func (ps *peerServer) calculateRateLimit(clientRate int) int { // for each key and value present in the map f := func(key, value interface{}) bool { if task, ok := value.(*taskConfig); ok { - if !task.finished { - total += task.rateLimit + if !task.Finished { + total += task.RateLimit } } return true } - ps.syncTaskMap.Range(f) + ps.syncTaskContainer.Range(f) // calculate the rate limit again according to totalLimit if total > ps.totalLimitRate { @@ -460,14 +769,14 @@ func (ps *peerServer) waitForShutdown() { func (ps *peerServer) shutdown() { // tell supernode this peer node is down and delete related files. - ps.syncTaskMap.Range(func(key, value interface{}) bool { + ps.syncTaskContainer.Range(func(key, value interface{}) bool { task, ok := value.(*taskConfig) - if ok { - ps.api.ServiceDown(task.superNode, task.taskID, task.cid) - serviceFile := helper.GetServiceFile(key.(string), task.dataDir) + if ok && !task.Other.SpecReport { + ps.api.ServiceDown(task.SuperNode, task.TaskID, task.Cid) + serviceFile := helper.GetServiceFile(key.(string), task.DataDir) os.Remove(serviceFile) logrus.Infof("shutdown, remove task id:%s file:%s", - task.taskID, serviceFile) + task.TaskID, serviceFile) } return true }) @@ -483,25 +792,31 @@ func (ps *peerServer) shutdown() { func (ps *peerServer) deleteExpiredFile(path string, info os.FileInfo, expireTime time.Duration) bool { taskName := helper.GetTaskName(info.Name()) - if v, ok := ps.syncTaskMap.Load(taskName); ok { + //logrus.Infof("fileName: %s, taskName: %s", info.Name(), taskName) + if v, ok := ps.syncTaskContainer.Load(taskName); ok { task, ok := v.(*taskConfig) - if ok && !task.finished { + if ok && !task.Finished { return false } - var lastAccessTime = task.accessTime + var lastAccessTime = task.AccessTime // use the bigger of access time and modify time to // check whether the task is expired - if task.accessTime.Sub(info.ModTime()) < 0 { + if task.AccessTime.Sub(info.ModTime()) < 0 { lastAccessTime = info.ModTime() } // if the last access time is expireTime ago if time.Since(lastAccessTime) > expireTime { + // ignore the gc if ok { - ps.api.ServiceDown(task.superNode, task.taskID, task.cid) + if !task.Other.SpecReport { + ps.api.ServiceDown(task.SuperNode, task.TaskID, task.Cid) + }else{ + ps.api.ReportResourceDeleted(task.SuperNode, task.TaskID, task.Cid) + } } os.Remove(path) - ps.syncTaskMap.Delete(taskName) + ps.syncTaskContainer.Delete(taskName) return true } } else { @@ -539,3 +854,58 @@ func jsonStr(v interface{}) string { b, _ := json.Marshal(v) return string(b) } + +// fetchHandler fetchs the local resource +func (ps *peerServer) fetchHandler(w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + sendHeader(w, http.StatusBadRequest) + fmt.Fprint(w, err.Error()) + return + } + + // todo: add filter ? + result := []*apitypes.TaskFetchInfo{} + + ps.syncTaskContainer.Range(func(key, value interface{}) bool { + taskFileName := key.(string) + tc := value.(*taskConfig) + + taskInfo := &apitypes.TaskFetchInfo{ + Pieces: []*apitypes.PieceInfo{ + { + Path: taskFileName, + }, + }, + Task: &apitypes.TaskInfo{ + ID: tc.TaskID, + TaskURL: tc.Other.TaskURL, + RawURL: tc.Other.RawURL, + PieceSize: int32(tc.Other.FileLength), + PieceTotal: 1, + FileLength: tc.Other.FileLength, + HTTPFileLength: tc.Other.FileLength, + // Headers: tc.Other.Headers, todo: parse header + Md5: ps.cfg.Md5, + Identifier: ps.cfg.Identifier, + }, + } + + result = append(result, taskInfo) + return true + }) + + resp := &types.FetchLocalTaskInfo{ + Tasks: result, + } + + respData, err := json.Marshal(resp) + if err != nil { + sendHeader(w, http.StatusInternalServerError) + fmt.Fprintf(w, "failed to encode %v: %v", resp, err) + return + } + + sendHeader(w, http.StatusOK) + w.Write(respData) + return +} diff --git a/dfget/core/uploader/peer_server_test.go b/dfget/core/uploader/peer_server_test.go index b07308ca8..2587acb93 100644 --- a/dfget/core/uploader/peer_server_test.go +++ b/dfget/core/uploader/peer_server_test.go @@ -306,11 +306,11 @@ func (s *PeerServerTestSuite) TestShutdown(c *check.C) { ioutil.WriteFile(tmpFile, []byte("hello"), os.ModePerm) ps := newPeerServer(cfg, 0) - ps.syncTaskMap.Store(taskName, &taskConfig{ - cid: "x", - superNode: "localhost", - taskID: "b", - dataDir: cfg.RV.SystemDataDir, + ps.syncTaskContainer.Store(taskName, &taskConfig{ + Cid: "x", + SuperNode: "localhost", + TaskID: "b", + DataDir: cfg.RV.SystemDataDir, }) ps.api = &helper.MockSupernodeAPI{ ServiceDownFunc: func(ip string, taskID string, cid string) (*types.BaseResponse, error) { @@ -339,10 +339,10 @@ func (s *PeerServerTestSuite) TestDeleteExpiredFile(c *check.C) { } var t = func(f bool) *taskConfig { return &taskConfig{ - taskID: fmt.Sprintf("%d", rand.Int63()), - finished: f, - dataDir: cfg.RV.SystemDataDir, - accessTime: time.Now(), + TaskID: fmt.Sprintf("%d", rand.Int63()), + Finished: f, + DataDir: cfg.RV.SystemDataDir, + AccessTime: time.Now(), } } @@ -352,9 +352,9 @@ func (s *PeerServerTestSuite) TestDeleteExpiredFile(c *check.C) { expire time.Duration deleted bool }{ - // delete finished and expired task file + // delete Finished and expired task file {name: f(), task: t(true), expire: 0, deleted: true}, - // don't delete finished but not expired task file + // don't delete Finished but not expired task file {name: f(), task: t(true), expire: time.Minute, deleted: false}, // don't delete unfinished task file {name: f(), task: t(false), expire: 0, deleted: false}, @@ -373,8 +373,8 @@ func (s *PeerServerTestSuite) TestDeleteExpiredFile(c *check.C) { filePath := helper.GetServiceFile(v.name, cfg.RV.SystemDataDir) finished := "" if v.task != nil { - ps.syncTaskMap.Store(v.name, v.task) - finished = fmt.Sprintf("%v", v.task.finished) + ps.syncTaskContainer.Store(v.name, v.task) + finished = fmt.Sprintf("%v", v.task.Finished) } info, _ := os.Stat(filePath) deleted := ps.deleteExpiredFile(filePath, info, v.expire) @@ -384,8 +384,8 @@ func (s *PeerServerTestSuite) TestDeleteExpiredFile(c *check.C) { c.Assert(deleted, check.Equals, v.deleted, cmt) c.Assert(fileutils.PathExist(filePath), check.Equals, !v.deleted, cmt) if v.task != nil { - c.Assert(mark[v.task.taskID], check.Equals, v.deleted, cmt) - _, ok := ps.syncTaskMap.Load(v.name) + c.Assert(mark[v.task.TaskID], check.Equals, v.deleted, cmt) + _, ok := ps.syncTaskContainer.Load(v.name) c.Assert(ok, check.Equals, !v.deleted, cmt) } } @@ -448,7 +448,7 @@ func (s *PeerServerTestSuite) TestParseRateHandler(c *check.C) { // normal test testRateLimit := 1000 - headers["rateLimit"] = strconv.Itoa(testRateLimit) + headers["RateLimit"] = strconv.Itoa(testRateLimit) if rr, err := testHandlerHelper(s.srv, &HandlerHelper{ method: http.MethodGet, url: config.LocalHTTPPathRate + file2000, @@ -471,8 +471,8 @@ func (s *PeerServerTestSuite) TestParseRateHandler(c *check.C) { } s.srv.totalLimitRate = 1000 - // wrong rateLimit test - headers["rateLimit"] = "foo" + // wrong RateLimit test + headers["RateLimit"] = "foo" if rr, err := testHandlerHelper(s.srv, &HandlerHelper{ method: http.MethodGet, url: config.LocalHTTPPathRate + file2000, @@ -511,8 +511,8 @@ func (s *PeerServerTestSuite) TestOneFinishHandler(c *check.C) { } exist := r() srv := newTestPeerServer(s.workHome) - srv.syncTaskMap.Store(exist.TaskFileName, &taskConfig{ - taskID: exist.TaskID, + srv.syncTaskContainer.Store(exist.TaskFileName, &taskConfig{ + TaskID: exist.TaskID, }) var cases = []struct { @@ -536,13 +536,13 @@ func (s *PeerServerTestSuite) TestOneFinishHandler(c *check.C) { continue } - t, ok := srv.syncTaskMap.Load(v.req.TaskFileName) + t, ok := srv.syncTaskContainer.Load(v.req.TaskFileName) c.Assert(ok, check.Equals, v.code == http.StatusOK) if ok { tt, ok := t.(*taskConfig) c.Assert(ok, check.Equals, true) - c.Assert(tt.rateLimit, check.Equals, 0) - c.Assert(tt.finished, check.Equals, true) + c.Assert(tt.RateLimit, check.Equals, 0) + c.Assert(tt.Finished, check.Equals, true) } } } diff --git a/dfget/core/uploader/uploader_helper_test.go b/dfget/core/uploader/uploader_helper_test.go index 5684d934f..6980fbe98 100644 --- a/dfget/core/uploader/uploader_helper_test.go +++ b/dfget/core/uploader/uploader_helper_test.go @@ -61,13 +61,13 @@ func newTestPeerServer(workHome string) (srv *peerServer) { return srv } -// initHelper creates a temporary file and store it in the syncTaskMap. +// initHelper creates a temporary file and store it in the syncTaskContainer. func initHelper(srv *peerServer, fileName, dataDir, content string) { helper.CreateTestFile(helper.GetServiceFile(fileName, dataDir), content) if srv != nil { - srv.syncTaskMap.Store(fileName, &taskConfig{ - dataDir: dataDir, - rateLimit: defaultRateLimit, + srv.syncTaskContainer.Store(fileName, &taskConfig{ + DataDir: dataDir, + RateLimit: defaultRateLimit, }) } } diff --git a/dfget/core/uploader/uploader_test.go b/dfget/core/uploader/uploader_test.go index e24094adb..cc26b5a0d 100644 --- a/dfget/core/uploader/uploader_test.go +++ b/dfget/core/uploader/uploader_test.go @@ -137,10 +137,10 @@ func (s *UploaderTestSuite) TestWaitForShutdown(c *check.C) { // immediately shutdown when p2p is nil {nil, 50, false}, - // immediately shutdown when p2p.finished is nil + // immediately shutdown when p2p.Finished is nil {&peerServer{}, 50, false}, - // shutdown when p2p.finished is closed after 50ms + // shutdown when p2p.Finished is closed after 50ms {&peerServer{finished: make(chan struct{})}, 50, true}, } @@ -275,9 +275,9 @@ func createTestFile(srv *peerServer, store bool, finished bool, expire time.Dura os.Chtimes(taskFile, expireTime, expireTime) if store { - srv.syncTaskMap.Store(name, &taskConfig{ - finished: finished, - accessTime: expireTime, + srv.syncTaskContainer.Store(name, &taskConfig{ + Finished: finished, + AccessTime: expireTime, }) } return name diff --git a/dfget/types/fetch_p2p_networkinfo.go b/dfget/types/fetch_p2p_networkinfo.go index f5218c613..dbff69449 100644 --- a/dfget/types/fetch_p2p_networkinfo.go +++ b/dfget/types/fetch_p2p_networkinfo.go @@ -31,23 +31,6 @@ type FetchP2PNetworkInfoResponse struct { } type FetchNetworkInfoDataResponse struct { - Nodes []*Node `json:"nodes"` + Nodes []*types.Node `json:"nodes"` } -type Node struct { - // basic node info - Basic *types.PeerInfo `json:"basic"` - // extra node info - Extra *Extra `json:"extra"` - // the load of node, which as the schedule weight in peer schedule - Load int `json:"load"` - // the tasks in the peer node - Tasks []*types.TaskInfo `json:"tasks"` -} - -type Extra struct { - Site string `json:"site"` - Rack string `json:"rack"` - Room string `json:"room"` - Idc string `json:"idc"` -} diff --git a/dfget/types/pull_piece_task_response.go b/dfget/types/pull_piece_task_response.go index 24c563893..b71b7c4c6 100644 --- a/dfget/types/pull_piece_task_response.go +++ b/dfget/types/pull_piece_task_response.go @@ -90,6 +90,10 @@ type PullPieceTaskResponseContinueData struct { PeerPort int `json:"peerPort"` Path string `json:"path"` DownLink int `json:"downLink"` + + Url string `json:"url"` + Header map[string][]string `json:"header"` + DirectSource bool } func (data *PullPieceTaskResponseContinueData) String() string { diff --git a/dfget/types/types.go b/dfget/types/types.go index 1697a8b24..8fe0f3f02 100644 --- a/dfget/types/types.go +++ b/dfget/types/types.go @@ -19,7 +19,15 @@ // modules(systems). It's hard to read and maintain. package types +import( + apitype "github.com/dragonflyoss/Dragonfly/apis/types" +) + // MetaInfo stores meta information of dfget. type MetaInfo struct { ServicePort int } + +type FetchLocalTaskInfo struct { + Tasks []*apitype.TaskFetchInfo +} \ No newline at end of file diff --git a/pkg/constants/dfget_super_code.go b/pkg/constants/dfget_super_code.go index 870045b4a..132870c82 100644 --- a/pkg/constants/dfget_super_code.go +++ b/pkg/constants/dfget_super_code.go @@ -74,6 +74,9 @@ const ( CodeSourceError = 610 CodeGetPieceReport = 611 CodeGetPeerDown = 612 + + CodeNOURL = 700 + CodeReturnSrc = 701 ) /* the code of task result that dfget will report to supernode */ diff --git a/pkg/httputils/http_util.go b/pkg/httputils/http_util.go index 2e2c60225..e4996f12f 100644 --- a/pkg/httputils/http_util.go +++ b/pkg/httputils/http_util.go @@ -454,3 +454,85 @@ func handlePairRange(rangeStr string, length int64) (*RangeStruct, error) { EndIndex: endIndex, }, nil } + +func GetTaskIDFromHeader(url string, header map[string][]string, keyOfTaskID string) string { + hr := http.Header(header) + if taskIDHeaderStr := hr.Get(keyOfTaskID); taskIDHeaderStr != "" { + ds, err := GetDigestFromHeader(taskIDHeaderStr) + if err != nil { + return "" + } + + // todo: support the merge request + if len(ds) != 1 { + return "" + } + + return ds[0].Digest + } + + return "" +} + +type DigestStruct struct { + Digest string + RangeStruct +} + +func GetDigestFromHeader(digestHeaderStr string) ([]*DigestStruct, error) { + var ( + digest string + rangeStr string + ) + + // digestHeaderStr looks like "sha256_1:0,1000;sha256_2:1001,2000" + + result := []*DigestStruct{} + + arr := strings.Split(digestHeaderStr, ";") + for _, elem := range arr { + kv := strings.Split(elem, ":") + if len(kv) > 3 || len(kv) < 2 { + return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr) + } + + if len(kv) == 2 { + digest = fmt.Sprintf("sha256:%s", kv[0]) + rangeStr = kv[1] + } + + if len(kv) == 3 { + digest = fmt.Sprintf("%s:%s", kv[0], kv[1]) + rangeStr = kv[2] + } + + // todo: verify the sha256 string + + rangeIndex := strings.Split(rangeStr, ",") + if len(rangeIndex) != 2 { + return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr) + } + + startIndex, err := strconv.ParseInt(rangeIndex[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr) + } + + endIndex, err := strconv.ParseInt(rangeIndex[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr) + } + + ds := &DigestStruct{ + Digest: digest, + RangeStruct: RangeStruct{ + StartIndex: startIndex, + EndIndex: endIndex, + }, + } + + result = append(result, ds) + } + + return result, nil +} diff --git a/pkg/queue/circle_queue.go b/pkg/queue/circle_queue.go new file mode 100644 index 000000000..0c3af7c1c --- /dev/null +++ b/pkg/queue/circle_queue.go @@ -0,0 +1,105 @@ +package queue + +import ( + "container/list" + "sync" + + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" +) + +type cQElementData struct { + key string + data interface{} +} + +type CircleQueue struct { + sync.Mutex + capacity int + + itemMap map[string]*list.Element + l *list.List +} + +func NewCircleQueue(capacity int) *CircleQueue { + return &CircleQueue{ + capacity: capacity, + itemMap: make(map[string]*list.Element, capacity), + l: list.New(), + } +} + +// put item to front +func (q *CircleQueue) PutFront(key string, data interface{}) { + q.Lock() + defer q.Unlock() + + if i, ok := q.itemMap[key]; ok { + //todo: update data + i.Value.(*cQElementData).data = data + q.internalPutFront(i) + return + } + + if len(q.itemMap) >= q.capacity { + // remove the earliest item + i := q.internalRemoveTail() + if i != nil { + delete(q.itemMap, i.Value.(*cQElementData).key) + } + } + + i := q.internalPutValue(&cQElementData{key: key, data: data}) + q.itemMap[key] = i +} + +// getFront will get several item from front and not poll out them. +func (q *CircleQueue) GetFront(count int) []interface{} { + q.Lock() + defer q.Unlock() + + result := make([]interface{}, count) + item := q.l.Front() + index := 0 + for { + if item == nil { + break + } + + result[index] = item.Value.(*cQElementData).data + index++ + if index >= count { + break + } + + item = item.Next() + } + + return result[:index] +} + +func (q *CircleQueue) GetItemByKey(key string) (interface{}, error) { + q.Lock() + defer q.Unlock() + + if data, exist := q.itemMap[key]; exist { + return data.Value.(*cQElementData).data, nil + } + + return nil, errortypes.ErrDataNotFound +} + +func (q *CircleQueue) internalPutFront(i *list.Element) { + q.l.MoveToFront(i) +} + +func (q *CircleQueue) internalPutValue(data interface{}) *list.Element { + e := q.l.PushFront(data) + return e +} + +func (q *CircleQueue) internalRemoveTail() *list.Element { + e := q.l.Back() + q.l.Remove(e) + + return e +} diff --git a/pkg/queue/circle_queue_test.go b/pkg/queue/circle_queue_test.go new file mode 100644 index 000000000..6997059e3 --- /dev/null +++ b/pkg/queue/circle_queue_test.go @@ -0,0 +1,62 @@ +package queue + +import ( + "github.com/go-check/check" +) + +func (suite *DFGetUtilSuite) TestCircleQueue(c *check.C) { + q := NewCircleQueue(5) + + q.PutFront("key1", 1) + + v1, err := q.GetItemByKey("key1") + c.Assert(err, check.IsNil) + c.Assert(v1.(int), check.Equals, 1) + + items := q.GetFront(1) + c.Assert(len(items), check.Equals, 1) + c.Assert(items[0], check.Equals, 1) + + q.PutFront("key2", 2) + q.PutFront("key1", 3) + + v1, err = q.GetItemByKey("key1") + c.Assert(err, check.IsNil) + c.Assert(v1.(int), check.Equals, 3) + + items = q.GetFront(10) + c.Assert(len(items), check.Equals, 2) + c.Assert(items[0], check.Equals, 3) + c.Assert(items[1], check.Equals, 2) + + items = q.GetFront(1) + c.Assert(len(items), check.Equals, 1) + c.Assert(items[0], check.Equals, 3) + + _, err = q.GetItemByKey("key3") + c.Assert(err, check.NotNil) + + q.PutFront("key3", "data3") + q.PutFront("key4", "data4") + q.PutFront("key5", "data5") + + items = q.GetFront(10) + c.Assert(len(items), check.Equals, 5) + c.Assert(items[0], check.Equals, "data5") + c.Assert(items[1], check.Equals, "data4") + c.Assert(items[2], check.Equals, "data3") + c.Assert(items[3], check.Equals, 3) + c.Assert(items[4], check.Equals, 2) + + q.PutFront("key6", "data6") + _, err = q.GetItemByKey("key2") + c.Assert(err, check.NotNil) + + items = q.GetFront(5) + c.Assert(len(items), check.Equals, 5) + c.Assert(items[0], check.Equals, "data6") + c.Assert(items[1], check.Equals, "data5") + c.Assert(items[2], check.Equals, "data4") + c.Assert(items[3], check.Equals, "data3") + c.Assert(items[4], check.Equals, 3) +}