Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9583a83
add init
wangforthinker Feb 21, 2020
db3ea61
add code for local manager
wangforthinker Feb 24, 2020
a314b08
add chan for local schedule to adjust peer load
wangforthinker Feb 24, 2020
13601d0
add code for local manager and local downloader
wangforthinker Feb 25, 2020
aa981f7
add NO_URL resp code in Register api
wangforthinker Feb 25, 2020
ff6a4b0
add start sync network info
wangforthinker Feb 25, 2020
e16349c
change local manager to single mode
wangforthinker Feb 25, 2020
a49b32a
if local schedule failed, try to direct return src and update p2p net…
wangforthinker Feb 25, 2020
22dba5c
when download from remote, if direct to source , not to verify the dstIP
wangforthinker Feb 26, 2020
7dd3f1d
fix download resource failed
wangforthinker Feb 26, 2020
1854120
init ratelimiter in power client
wangforthinker Feb 26, 2020
d395916
notify the client stream writer to finish to return
wangforthinker Feb 26, 2020
4cbd748
add length for test
wangforthinker Feb 26, 2020
8e298dc
fix more code to make it work.
wangforthinker Feb 26, 2020
078fad0
add report to uploader
wangforthinker Feb 27, 2020
4d932e3
add some logs, note some code to fast run.
wangforthinker Feb 27, 2020
c260388
note the service down
wangforthinker Feb 27, 2020
e9a01f9
add log for FetchP2PNetworkInfo
wangforthinker Feb 27, 2020
f2bbd20
fix not add taskState when add a new one.
wangforthinker Feb 27, 2020
9ed43a3
init local manager in daemon
wangforthinker Feb 28, 2020
7db37c0
update range including piece meta size if p2p download
wangforthinker Feb 28, 2020
d2f0125
adapt to get range from header
wangforthinker Feb 28, 2020
66fe556
optimize the close progress of client stream
wangforthinker Feb 29, 2020
2518240
add numerical ware
wangforthinker Mar 3, 2020
3bbfb37
add lock in numerical and add auto compute
wangforthinker Mar 3, 2020
5b4b22f
fix numerical get data error.
wangforthinker Mar 3, 2020
92417c8
add reset controll
wangforthinker Mar 3, 2020
2cc1d53
add cache in peer uploader and add unit test.
wangforthinker Mar 4, 2020
2e058b9
add more numerical info
wangforthinker Mar 5, 2020
d9ff274
if url not fetch recently, dirently try to fecth from supernode.
wangforthinker Mar 6, 2020
a8d1401
support local resouce cache to report to supernode
wangforthinker Mar 9, 2020
6bebf54
fix the uploader init failed
wangforthinker Mar 10, 2020
64870ef
prior to get local task in download
wangforthinker Mar 10, 2020
5431aa7
delete task if local task is expired
wangforthinker Mar 11, 2020
1086c73
Merge pull request #6 from antsystem/feat/support-local-task-reload
wangforthinker Mar 11, 2020
6e82829
add heart beat for peer
wangforthinker Mar 11, 2020
91bb821
Merge pull request #7 from antsystem/feat/support-heart-beat
wangforthinker Mar 11, 2020
f241659
change specfical to config
wangforthinker Mar 12, 2020
5b9b471
fix some code by CR
wangforthinker Mar 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions apis/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,31 @@ definitions:
items:
$ref: "#/definitions/Node"

HeartBeatRequest:
type: "object"
description: ""
properties:
IP:
type: "string"
description: "IP address which peer client carries"
format: "string"
port:
type: "integer"
description: |
when registering, dfget will setup one uploader process.
This one acts as a server for peer pulling tasks.
This port is which this server listens on.
format: "int32"
minimum: 15000
maximum: 65000
cID:
type: "string"
description: |
CID means the client ID. It maps to the specific dfget process.
When user wishes to download an image/file, user would start a dfget process to do this.
This dfget is treated a client and carries a client ID.
Thus, multiple dfget processes on the same peer have different CIDs.

ErrorResponse:
type: "object"
description: |
Expand Down
86 changes: 86 additions & 0 deletions apis/types/heart_beat_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cmd/dfdaemon/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package app

import (
"encoding/json"
"github.com/dragonflyoss/Dragonfly/dfdaemon/localManager"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -68,6 +69,9 @@ var rootCmd = &cobra.Command{
cfgJSON, _ := json.Marshal(cfg)
logrus.Infof("using config: %s", cfgJSON)


_ = localManager.NewLocalManager(cfg.DFGetConfig())

s, err := dfdaemon.NewFromConfig(*cfg)
if err != nil {
return errors.Wrap(err, "create dfdaemon from config")
Expand Down
48 changes: 33 additions & 15 deletions dfdaemon/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type Properties struct {
LogConfig dflog.LogConfig `yaml:"logConfig" json:"logConfig"`
LocalIP string `yaml:"localIP" json:"localIP"`
PeerPort int `yaml:"peerPort" json:"peerPort"`

// Extreme is
Extreme *ExtremeConfig `yaml:"extreme" json:"extreme"`
}

// Validate validates the config
Expand Down Expand Up @@ -145,13 +148,14 @@ func (p *Properties) DFGetConfig() DFGetConfig {
}

dfgetConfig := DFGetConfig{
DfgetFlags: dfgetFlags,
SuperNodes: p.SuperNodes,
RateLimit: p.RateLimit.String(),
DFRepo: p.DFRepo,
DFPath: p.DFPath,
LocalIP: p.LocalIP,
PeerPort: p.PeerPort,
DfgetFlags: dfgetFlags,
SuperNodes: p.SuperNodes,
RateLimit: p.RateLimit.String(),
DFRepo: p.DFRepo,
DFPath: p.DFPath,
LocalIP: p.LocalIP,
PeerPort: p.PeerPort,
SpecKeyOfExtremeTaskID: p.Extreme.SpecKeyOfTaskID,
}
if p.HijackHTTPS != nil {
dfgetConfig.HostsConfig = p.HijackHTTPS.Hosts
Expand All @@ -166,19 +170,25 @@ func (p *Properties) DFGetConfig() DFGetConfig {
})
}
}

if dfgetConfig.SpecKeyOfExtremeTaskID == "" {
dfgetConfig.SpecKeyOfExtremeTaskID = constant.DefaultSpecKeyOfExtremeTaskID
}

return dfgetConfig
}

// DFGetConfig configures how dfdaemon calls dfget.
type DFGetConfig struct {
DfgetFlags []string `yaml:"dfget_flags"`
SuperNodes []string `yaml:"supernodes"`
RateLimit string `yaml:"ratelimit"`
DFRepo string `yaml:"localrepo"`
DFPath string `yaml:"dfpath"`
HostsConfig []*HijackHost `yaml:"hosts" json:"hosts"`
PeerPort int `yaml:"peerPort"`
LocalIP string `yaml:"localIP"`
DfgetFlags []string `yaml:"dfget_flags"`
SuperNodes []string `yaml:"supernodes"`
RateLimit string `yaml:"ratelimit"`
DFRepo string `yaml:"localrepo"`
DFPath string `yaml:"dfpath"`
HostsConfig []*HijackHost `yaml:"hosts" json:"hosts"`
PeerPort int `yaml:"peerPort"`
LocalIP string `yaml:"localIP"`
SpecKeyOfExtremeTaskID string `yaml:"specKeyOfTaskID"`
}

// RegistryMirror configures the mirror of the official docker registry
Expand Down Expand Up @@ -412,3 +422,11 @@ func NewProxy(regx string, useHTTPS bool, direct bool, redirect string) (*Proxy,
func (r *Proxy) Match(url string) bool {
return r.Regx != nil && r.Regx.MatchString(url)
}

// ExtremeConfig represents the config of extreme mode
type ExtremeConfig struct {
// SpecKeyOfExtremeTaskID defines the header key which represents the taskID
SpecKeyOfTaskID string `yaml:"specKeyOfTaskID" json:"specKeyOfTaskID"`

SpecKeyOfDirectRet string `yaml:"specKeyOfDirectRet" json:"specKeyOfDirectRet"`
}
4 changes: 4 additions & 0 deletions dfdaemon/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ const (
const (
// DefaultConfigPath is the default path of dfdaemon configuration file.
DefaultConfigPath = "/etc/dragonfly/dfdaemon.yml"

// DefaultSpecKeyOfExtremeTaskID is the default value of key which represents
// the taskID in extreme mode.
DefaultSpecKeyOfExtremeTaskID = "X-extreme-key"
)

const (
Expand Down
86 changes: 2 additions & 84 deletions dfdaemon/downloader/p2p/dfclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ package p2p
import (
"context"
"fmt"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"

Expand All @@ -43,11 +40,6 @@ type DFClient struct {
dfClient core.DFGet
}

type DigestStruct struct {
Digest string
httputils.RangeStruct
}

func (c *DFClient) DownloadContext(ctx context.Context, url string, header map[string][]string, name string) (string, error) {
// startTime := time.Now()
dstPath := filepath.Join(c.config.DFRepo, name)
Expand Down Expand Up @@ -107,27 +99,11 @@ func (c *DFClient) doDownload(ctx context.Context, url string, header map[string
runtimeConfig.URL = url
runtimeConfig.RV.TaskURL = url
runtimeConfig.RV.TaskFileName = getTaskFileName(destPath, c.dfGetConfig.Sign)
runtimeConfig.Header = flattenHeader(header)
runtimeConfig.Header = FlattenHeader(header)
runtimeConfig.Output = destPath
runtimeConfig.RV.RealTarget = destPath
runtimeConfig.RV.TargetDir = filepath.Dir(destPath)

hr := http.Header(header)
if digestHeaderStr := hr.Get(dfgetcfg.StrDigest); digestHeaderStr != "" {
ds, err := getDigest(digestHeaderStr)
if err != nil {
return nil, err
}

// todo: support the merge request
if len(ds) != 1 {
return nil, fmt.Errorf("no support to merge request")
}

runtimeConfig.RV.Digest = ds[0].Digest
runtimeConfig.RV.FileLength = (ds[0].EndIndex - ds[0].StartIndex) + 1
}

return c.dfClient.GetReader(ctx, &runtimeConfig)
}

Expand All @@ -139,7 +115,7 @@ func getCid(localIP string, sign string) string {
return localIP + "-" + sign
}

func flattenHeader(header map[string][]string) []string {
func FlattenHeader(header map[string][]string) []string {
var res []string
for key, value := range header {
// discard HTTP host header for backing to source successfully
Expand All @@ -156,61 +132,3 @@ func flattenHeader(header map[string][]string) []string {
}
return res
}

func getDigest(digestHeaderStr string) ([]*DigestStruct, error) {
var (
digest string
rangeStr string
)

// digestHeaderStr looks like "sha256_1:0,1000;sha256_2:1001,2000"

result := []*DigestStruct{}

arr := strings.Split(digestHeaderStr, ";")
for _, elem := range arr {
kv := strings.Split(elem, ":")
if len(kv) > 3 || len(kv) < 2 {
return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr)
}

if len(kv) == 2 {
digest = fmt.Sprintf("sha256:%s", kv[0])
rangeStr = kv[1]
}

if len(kv) == 3 {
digest = fmt.Sprintf("%s:%s", kv[0], kv[1])
rangeStr = kv[2]
}

// todo: verify the sha256 string

rangeIndex := strings.Split(rangeStr, ",")
if len(rangeIndex) != 2 {
return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr)
}

startIndex, err := strconv.ParseInt(rangeIndex[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr)
}

endIndex, err := strconv.ParseInt(rangeIndex[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("%s is not vaild for digestHeader", digestHeaderStr)
}

ds := &DigestStruct{
Digest: digest,
RangeStruct: httputils.RangeStruct{
StartIndex: startIndex,
EndIndex: endIndex,
},
}

result = append(result, ds)
}

return result, nil
}
2 changes: 1 addition & 1 deletion dfdaemon/downloader/p2p/dfclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (tc *testcase) WithValue(key string, value ...string) *testcase {
func (tc *testcase) Test(t *testing.T) {
a := assert.New(t)

actualResult := flattenHeader(tc.header)
actualResult := FlattenHeader(tc.header)

for _, r := range actualResult {
if _, ok := tc.result[r]; !ok {
Expand Down
12 changes: 12 additions & 0 deletions dfdaemon/localManager/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package localManager

import "github.com/dragonflyoss/Dragonfly/pkg/syncmap"

type cache struct {

}

type cacheManager struct {
// key is taskID, value is the
taskContainer *syncmap.SyncMap
}
Loading