From d408698979cad1735cdec4a3b4e0bb76baeeeb29 Mon Sep 17 00:00:00 2001 From: seven <2260006962@qq.com> Date: Thu, 3 Jul 2025 17:24:07 +0800 Subject: [PATCH] feat: syncer task --- internal/bootstrap/task.go | 1 + internal/db/db.go | 3 +- internal/db/syncer.go | 38 ++++ internal/fs/fs.go | 8 + internal/fs/syncer.go | 395 +++++++++++++++++++++++++++++++++++++ internal/fs/walk.go | 13 +- internal/model/syncer.go | 31 +++ internal/op/syncer.go | 36 ++++ server/handles/syncer.go | 149 ++++++++++++++ server/router.go | 12 ++ 10 files changed, 681 insertions(+), 5 deletions(-) create mode 100644 internal/db/syncer.go create mode 100644 internal/fs/syncer.go create mode 100644 internal/model/syncer.go create mode 100644 internal/op/syncer.go create mode 100644 server/handles/syncer.go diff --git a/internal/bootstrap/task.go b/internal/bootstrap/task.go index 0ace27cf5..2aaae85d7 100644 --- a/internal/bootstrap/task.go +++ b/internal/bootstrap/task.go @@ -30,6 +30,7 @@ func InitTaskManager() { op.RegisterSettingChangingCallback(func() { fs.MoveTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers))) }) + fs.SyncTaskManager = tache.NewManager[*fs.SyncTask](tache.WithWorks(1), tache.WithMaxRetry(0)) tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(setting.GetInt(conf.TaskOfflineDownloadThreadsNum, conf.Conf.Tasks.Download.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant), db.UpdateTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry)) op.RegisterSettingChangingCallback(func() { tool.DownloadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskOfflineDownloadThreadsNum, conf.Conf.Tasks.Download.Workers))) diff --git a/internal/db/db.go b/internal/db/db.go index 2299a1a36..a9f335574 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -12,7 +12,8 @@ var db *gorm.DB func Init(d *gorm.DB) { db = d - err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem), new(model.SSHPublicKey)) + err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), + new(model.TaskItem), new(model.SSHPublicKey), new(model.SyncerTaskArgs)) if err != nil { log.Fatalf("failed migrate database: %s", err.Error()) } diff --git a/internal/db/syncer.go b/internal/db/syncer.go new file mode 100644 index 000000000..2e6a02704 --- /dev/null +++ b/internal/db/syncer.go @@ -0,0 +1,38 @@ +package db + +import ( + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/pkg/errors" +) + +func CreateSyncerTaskArgs(syncerTaskArgs *model.SyncerTaskArgs) error { + return errors.WithStack(db.Create(syncerTaskArgs).Error) +} + +func UpdateSyncerTaskArgs(syncerTaskArgs *model.SyncerTaskArgs) error { + return errors.WithStack(db.Save(syncerTaskArgs).Error) +} + +func DeleteSyncerTaskArgsById(id uint) error { + return errors.WithStack(db.Delete(&model.SyncerTaskArgs{}, id).Error) +} + +func GetSyncerTaskArgsById(id uint) (*model.SyncerTaskArgs, error) { + var syncerTaskArgs model.SyncerTaskArgs + syncerTaskArgs.ID = id + if err := db.First(&syncerTaskArgs).Error; err != nil { + return nil, errors.WithStack(err) + } + return &syncerTaskArgs, nil +} + +func GetSyncerTaskArgs(pageIndex, pageSize int) (syncTaskArgList []model.SyncerTaskArgs, count int64, err error) { + syncerTaskArgsDB := db.Model(&model.SyncerTaskArgs{}) + if err := syncerTaskArgsDB.Count(&count).Error; err != nil { + return nil, 0, errors.Wrapf(err, "failed get syncTaskArgList count") + } + if err := syncerTaskArgsDB.Order(columnName("id")).Offset((pageIndex - 1) * pageSize).Limit(pageSize).Find(&syncTaskArgList).Error; err != nil { + return nil, 0, errors.Wrapf(err, "failed get find syncTaskArgList") + } + return syncTaskArgList, count, nil +} diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 297e73516..b8a03243a 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -204,3 +204,11 @@ func PutURL(ctx context.Context, path, dstName, urlStr string) error { } return op.PutURL(ctx, storage, dstDirActualPath, dstName, urlStr) } + +func Syncer(ctx context.Context, syncerArgs model.SyncerTaskArgs) (task.TaskExtensionInfo, error) { + res, err := _syncer(ctx, syncerArgs) + if err != nil { + log.Errorf("failed sync %s to %s: %+v", syncerArgs.SrcPath, syncerArgs.DstPath, err) + } + return res, err +} diff --git a/internal/fs/syncer.go b/internal/fs/syncer.go new file mode 100644 index 000000000..fc44fd61d --- /dev/null +++ b/internal/fs/syncer.go @@ -0,0 +1,395 @@ +package fs + +import ( + "context" + "fmt" + stdpath "path" + "path/filepath" + "sort" + "strconv" + "sync" + "time" + + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/OpenListTeam/OpenList/v4/internal/task" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/OpenListTeam/OpenList/v4/server/common" + "github.com/OpenListTeam/tache" + "github.com/pkg/errors" +) + +type SyncTask struct { + task.TaskExtension + Status string `json:"-"` + TaskName string `json:"task_name"` + SrcPath string `json:"src_path"` + DstPath string `json:"dst_path"` + TaskType string `json:"task_type" default:"copy"` + lazyCache bool + srcStorage driver.Driver + dstStorage driver.Driver + onlyInSrc []entry + onlyInDst []entry + childTasks []task.TaskExtensionInfo + ChildTaskInfos []model.SyncerChildTaskInfo + mu sync.RWMutex +} + +type entry struct { + relPath string + obj model.Obj +} + +func (t *SyncTask) GetName() string { + return fmt.Sprintf("%s sync [%s] to [%s] with %s mode", t.TaskName, t.SrcPath, t.DstPath, t.TaskType) +} + +func (t *SyncTask) GetStatus() string { + return t.Status +} + +func (t *SyncTask) Cancel() { + t.SetCancelFunc(func() { + for _, childTask := range t.childTasks { + childTask.Cancel() + } + }) + t.TaskExtension.Cancel() +} + +func (t *SyncTask) Run() error { + if err := t.ReinitCtx(); err != nil { + return err + } + t.ClearEndTime() + t.SetStartTime(time.Now()) + defer func() { t.SetEndTime(time.Now()) }() + var err error + if t.srcStorage == nil { + t.srcStorage, err = op.GetStorageByMountPath(t.SrcPath) + } + if t.dstStorage == nil { + t.dstStorage, err = op.GetStorageByMountPath(t.DstPath) + } + if err != nil { + return errors.WithMessage(err, "failed get storage") + } + return startSync(t) +} + +var SyncTaskManager *tache.Manager[*SyncTask] + +const maxDepth int = 10 + +func _syncer(ctx context.Context, syncerArgs model.SyncerTaskArgs) (task.TaskExtensionInfo, error) { + srcPath := syncerArgs.SrcPath + dstPath := syncerArgs.DstPath + srcStorage, _, err := op.GetStorageAndActualPath(srcPath) + if err != nil { + return nil, errors.WithMessage(err, "failed get src storage") + } + dstStorage, _, err := op.GetStorageAndActualPath(dstPath) + if err != nil { + return nil, errors.WithMessage(err, "failed get dst storage") + } + taskCreator, _ := ctx.Value("user").(*model.User) + t := &SyncTask{ + TaskExtension: task.TaskExtension{ + Creator: taskCreator, + ApiUrl: common.GetApiUrl(ctx), + }, + TaskName: syncerArgs.TaskName, + SrcPath: srcPath, + DstPath: dstPath, + TaskType: syncerArgs.TaskType, + lazyCache: syncerArgs.LazyCache, + srcStorage: srcStorage, + dstStorage: dstStorage, + } + t.SetID(strconv.Itoa(int(syncerArgs.ID))) + SyncTaskManager.Add(t) + return t, nil +} + +func startSync(t *SyncTask) error { + allSrcFiles, allDstFiles, err := t.getAllFile() + if err != nil { + return err + } + compare(t, allSrcFiles, allDstFiles) + t.Status = "Handling source directory between target directory differences" + err = t.handleSrcDiff() + if err != nil { + return err + } + err = t.handleDstDiff() + if err != nil { + return err + } + + t.waitForChildTasks() + t.Status = "Syncer finish" + return nil +} + +func (t *SyncTask) getAllFile() (map[string]model.Obj, map[string]model.Obj, error) { + getFileFn := func(path string, lazyCache bool) (map[string]model.Obj, error) { + result := make(map[string]model.Obj) + walkFn := func(childPath string, info model.Obj) error { + relPath, err := filepath.Rel(path, childPath) + if err != nil { + return err + } + result[relPath] = info + return nil + } + fi, err := Get(t.Ctx(), path, &GetArgs{NoLog: true}) + if err != nil { + return nil, err + } + err = WalkFSWithRefresh(t.Ctx(), maxDepth, path, fi, !lazyCache, walkFn) + if err != nil { + return nil, err + } + return result, nil + } + t.Status = "getting src files" + allSrcFiles, err := getFileFn(t.SrcPath, t.lazyCache) + if err != nil { + return nil, nil, err + } + t.Status = "getting dst files" + allDstFiles, err := getFileFn(t.DstPath, t.lazyCache) + if err != nil { + return nil, nil, err + } + return allSrcFiles, allDstFiles, nil +} + +func compare(t *SyncTask, srcFiles, dstFiles map[string]model.Obj) { + compareFn := func(srcMap, dstMap map[string]model.Obj) []entry { + var result []entry + for path, obj := range srcMap { + if _, ok := dstMap[path]; !ok { + result = append(result, entry{path, obj}) + } + } + // 排序:目录优先 + sort.Slice(result, func(i, j int) bool { + return result[i].obj.IsDir() && !result[j].obj.IsDir() + }) + return result + } + t.Status = "Comparing the difference between source directory and target directory" + t.onlyInSrc = compareFn(srcFiles, dstFiles) + t.onlyInDst = compareFn(dstFiles, srcFiles) +} + +func (t *SyncTask) handleSrcDiff() error { + switch t.TaskType { + case model.Copy, model.CopyAndDelete, model.TwoWaySync: + for _, e := range t.onlyInSrc { + src := stdpath.Join(t.SrcPath, e.relPath) + dst := stdpath.Join(t.DstPath, e.relPath) + err := t.copy(e.obj, src, dst) + if err != nil { + return err + } + } + case model.Move, model.MoveAndDelete: + for _, e := range t.onlyInSrc { + src := stdpath.Join(t.SrcPath, e.relPath) + dst := stdpath.Join(t.DstPath, e.relPath) + err := t.move(e.obj, src, dst) + if err != nil { + return err + } + } + } + return nil +} + +func (t *SyncTask) handleDstDiff() error { + switch t.TaskType { + case model.CopyAndDelete, model.MoveAndDelete, model.Delete: + for _, e := range t.onlyInDst { + err := t.remove(e.relPath) + if err != nil { + return err + } + } + case model.TwoWaySync: + for _, e := range t.onlyInDst { + src := stdpath.Join(t.DstPath, e.relPath) + dst := stdpath.Join(t.SrcPath, e.relPath) + err := t.copy(e.obj, src, dst) + if err != nil { + return err + } + } + } + return nil +} + +func (t *SyncTask) copy(obj model.Obj, src string, dst string) error { + if obj.IsDir() { + err := MakeDir(t.Ctx(), dst, t.lazyCache) + if err != nil { + return err + } + } else { + dst = filepath.Dir(dst) + copyTask, err := Copy(t.Ctx(), src, dst) + if err != nil { + return err + } + if copyTask != nil { + t.childTasks = append(t.childTasks, copyTask) + } else { + t.ChildTaskInfos = append(t.ChildTaskInfos, model.SyncerChildTaskInfo{ + TaskType: model.Copy, + SrcPath: src, + DstPath: dst, + State: tache.StateSucceeded, + Progress: 100, + }) + } + } + return nil +} + +func (t *SyncTask) move(obj model.Obj, src string, dst string) error { + if obj.IsDir() { + err := MakeDir(t.Ctx(), dst, t.lazyCache) + if err != nil { + return err + } + } else { + dst = filepath.Dir(dst) + moveTask, err := MoveWithTaskAndValidation(t.Ctx(), src, dst, true, t.lazyCache) + if err != nil { + return err + } + if moveTask != nil { + t.childTasks = append(t.childTasks, moveTask) + } else { + t.ChildTaskInfos = append(t.ChildTaskInfos, model.SyncerChildTaskInfo{ + TaskType: model.Move, + SrcPath: src, + DstPath: dst, + State: tache.StateSucceeded, + Progress: 100, + }) + } + } + return nil +} + +func (t *SyncTask) remove(relPath string) error { + err := Remove(t.Ctx(), stdpath.Join(t.DstPath, relPath)) + if err != nil { + return err + } else { + t.ChildTaskInfos = append(t.ChildTaskInfos, model.SyncerChildTaskInfo{ + TaskType: model.Delete, + DeletePath: stdpath.Join(t.DstPath, relPath), + State: tache.StateSucceeded, + Progress: 100, + }) + } + return nil +} + +func (t *SyncTask) waitForChildTasks() { + var wg sync.WaitGroup + progressCh := make(chan struct{}, len(t.childTasks)) + + total := len(t.childTasks) + if total == 0 { + t.SetProgress(100) + return + } + + for _, child := range t.childTasks { + wg.Add(1) + + go func(task task.TaskExtensionInfo) { + defer wg.Done() + interval := 300 * time.Millisecond + maxInterval := 2 * time.Second + increaseStep := 200 * time.Millisecond + + for { + state := task.GetState() + t.updateOrAddChildTaskInfo(task) + if utils.SliceContains([]tache.State{ + tache.StateSucceeded, + tache.StateFailed, + tache.StateCanceled, + }, state) { + progressCh <- struct{}{} + return + } + time.Sleep(interval) + if interval < maxInterval { + interval += increaseStep + if interval > maxInterval { + interval = maxInterval + } + } + } + }(child) + } + + go func() { + completed := 0 + for range progressCh { + completed++ + t.SetProgress(float64(completed) / float64(total) * 100) + if completed == total { + close(progressCh) + } + } + }() + + wg.Wait() +} + +func (t *SyncTask) updateOrAddChildTaskInfo(task task.TaskExtensionInfo) { + t.mu.Lock() + defer t.mu.Unlock() + + for i, info := range t.ChildTaskInfos { + if info.TaskId == task.GetID() { + t.ChildTaskInfos[i].State = task.GetState() + return + } + } + + if copyTask, ok := task.(*CopyTask); ok { + t.ChildTaskInfos = append(t.ChildTaskInfos, model.SyncerChildTaskInfo{ + TaskId: copyTask.GetID(), + TaskType: model.Copy, + SrcPath: stdpath.Join(t.srcStorage.GetStorage().MountPath, copyTask.SrcObjPath), + DstPath: stdpath.Join(t.dstStorage.GetStorage().MountPath, copyTask.DstDirPath), + State: task.GetState(), + Progress: task.GetProgress(), + }) + return + } + + if moveTask, ok := task.(*MoveTask); ok { + t.ChildTaskInfos = append(t.ChildTaskInfos, model.SyncerChildTaskInfo{ + TaskId: moveTask.GetID(), + TaskType: model.Move, + SrcPath: moveTask.SrcObjPath, + DstPath: moveTask.DstDirPath, + State: task.GetState(), + Progress: task.GetProgress(), + }) + return + } + +} diff --git a/internal/fs/walk.go b/internal/fs/walk.go index 22af85062..d4bbc75e0 100644 --- a/internal/fs/walk.go +++ b/internal/fs/walk.go @@ -2,6 +2,7 @@ package fs import ( "context" + "errors" "path" "path/filepath" @@ -15,10 +16,14 @@ import ( // WalkFS calls walkFn. If a visited file system node is a directory and // walkFn returns path.SkipDir, walkFS will skip traversal of this node. func WalkFS(ctx context.Context, depth int, name string, info model.Obj, walkFn func(reqPath string, info model.Obj) error) error { + return WalkFSWithRefresh(ctx, depth, name, info, false, walkFn) +} + +func WalkFSWithRefresh(ctx context.Context, depth int, name string, info model.Obj, refresh bool, walkFn func(reqPath string, info model.Obj) error) error { // This implementation is based on Walk's code in the standard path/path package. walkFnErr := walkFn(name, info) if walkFnErr != nil { - if info.IsDir() && walkFnErr == filepath.SkipDir { + if info.IsDir() && errors.Is(walkFnErr, filepath.SkipDir) { return nil } return walkFnErr @@ -28,14 +33,14 @@ func WalkFS(ctx context.Context, depth int, name string, info model.Obj, walkFn } meta, _ := op.GetNearestMeta(name) // Read directory names. - objs, err := List(context.WithValue(ctx, "meta", meta), name, &ListArgs{}) + objs, err := List(context.WithValue(ctx, "meta", meta), name, &ListArgs{Refresh: refresh}) if err != nil { return walkFnErr } for _, fileInfo := range objs { filename := path.Join(name, fileInfo.GetName()) - if err := WalkFS(ctx, depth-1, filename, fileInfo, walkFn); err != nil { - if err == filepath.SkipDir { + if err := WalkFSWithRefresh(ctx, depth-1, filename, fileInfo, refresh, walkFn); err != nil { + if errors.Is(err, filepath.SkipDir) { break } return err diff --git a/internal/model/syncer.go b/internal/model/syncer.go new file mode 100644 index 000000000..41a955c6b --- /dev/null +++ b/internal/model/syncer.go @@ -0,0 +1,31 @@ +package model + +import "github.com/OpenListTeam/tache" + +const ( + Copy string = "copy" + Move string = "move" + Delete string = "delete" + CopyAndDelete string = "copy_and_delete" + MoveAndDelete string = "move_and_delete" + TwoWaySync string = "two_way_sync" +) + +type SyncerTaskArgs struct { + ID uint `json:"id" gorm:"primaryKey"` + TaskName string `json:"task_name"` + SrcPath string `json:"src_path"` + DstPath string `json:"dst_path"` + TaskType string `json:"task_type" default:"copy"` + LazyCache bool `json:"lazy_cache" default:"true"` +} + +type SyncerChildTaskInfo struct { + TaskId string `json:"task_id"` + TaskType string `json:"task_type" ` + SrcPath string `json:"src_path"` + DstPath string `json:"dst_path"` + DeletePath string `json:"delete_path"` + State tache.State + Progress float64 +} diff --git a/internal/op/syncer.go b/internal/op/syncer.go new file mode 100644 index 000000000..8a89615fe --- /dev/null +++ b/internal/op/syncer.go @@ -0,0 +1,36 @@ +package op + +import ( + "github.com/OpenListTeam/OpenList/v4/internal/db" + "github.com/OpenListTeam/OpenList/v4/internal/model" +) + +func CreateSyncerTaskArgs(syncerTaskArgs *model.SyncerTaskArgs) error { + return db.CreateSyncerTaskArgs(syncerTaskArgs) +} + +func DeleteSyncerTaskArgsById(id uint) error { + _, err := db.GetSyncerTaskArgsById(id) + if err != nil { + return err + } + // todo 判断任务是否正在运行 + return db.DeleteSyncerTaskArgsById(id) +} + +func UpdateSyncerTaskArgs(syncerTaskArgs *model.SyncerTaskArgs) error { + _, err := db.GetSyncerTaskArgsById(syncerTaskArgs.ID) + if err != nil { + return err + } + // todo 判断任务是否正在运行 + return db.UpdateSyncerTaskArgs(syncerTaskArgs) +} + +func GetTaskArgsById(id uint) (*model.SyncerTaskArgs, error) { + return db.GetSyncerTaskArgsById(id) +} + +func GetTaskArgs(pageIndex, pageSize int) (users []model.SyncerTaskArgs, count int64, err error) { + return db.GetSyncerTaskArgs(pageIndex, pageSize) +} diff --git a/server/handles/syncer.go b/server/handles/syncer.go new file mode 100644 index 000000000..c615e19fa --- /dev/null +++ b/server/handles/syncer.go @@ -0,0 +1,149 @@ +package handles + +import ( + "fmt" + log "github.com/sirupsen/logrus" + "strconv" + + "github.com/OpenListTeam/OpenList/v4/internal/fs" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/OpenListTeam/OpenList/v4/internal/task" + "github.com/OpenListTeam/OpenList/v4/server/common" + "github.com/gin-gonic/gin" +) + +func ListSyncerTask(c *gin.Context) { + var req model.PageReq + if err := c.ShouldBind(&req); err != nil { + common.ErrorResp(c, err, 400) + return + } + req.Validate() + log.Debugf("%+v", req) + syncerTasks, total, err := op.GetTaskArgs(req.Page, req.PerPage) + if err != nil { + common.ErrorResp(c, err, 500, true) + return + } + common.SuccessResp(c, common.PageResp{ + Content: syncerTasks, + Total: total, + }) +} + +func CreateSyncerTask(c *gin.Context) { + var req model.SyncerTaskArgs + if err := c.ShouldBind(&req); err != nil { + common.ErrorResp(c, err, 400) + return + } + if err := op.CreateSyncerTaskArgs(&req); err != nil { + common.ErrorResp(c, err, 500, true) + } else { + common.SuccessResp(c) + } +} + +func UpdateSyncerTask(c *gin.Context) { + var req model.SyncerTaskArgs + if err := c.ShouldBind(&req); err != nil { + common.ErrorResp(c, err, 400) + return + } + _, err := op.GetTaskArgsById(req.ID) + if err != nil { + common.ErrorResp(c, err, 500) + return + } + if err := op.UpdateSyncerTaskArgs(&req); err != nil { + common.ErrorResp(c, err, 500) + } else { + common.SuccessResp(c) + } +} + +func DeleteSyncerTask(c *gin.Context) { + idStr := c.Query("id") + id, err := strconv.Atoi(idStr) + if err != nil { + common.ErrorResp(c, err, 400) + return + } + if err := op.DeleteSyncerTaskArgsById(uint(id)); err != nil { + common.ErrorResp(c, err, 500) + return + } + common.SuccessResp(c) +} + +func GetSyncerTask(c *gin.Context) { + idStr := c.Query("id") + id, err := strconv.Atoi(idStr) + if err != nil { + common.ErrorResp(c, err, 400) + return + } + syncerTask, err := op.GetTaskArgsById(uint(id)) + if err != nil { + common.ErrorResp(c, err, 500, true) + return + } + common.SuccessResp(c, syncerTask) +} + +func RunSyncer(c *gin.Context) { + idStr := c.Query("id") + id, err := strconv.Atoi(idStr) + if err != nil { + common.ErrorResp(c, err, 400) + return + } + syncerTask, err := op.GetTaskArgsById(uint(id)) + if err != nil { + common.ErrorStrResp(c, "syncer task not find", 500) + return + } + + var addedTasks []task.TaskExtensionInfo + + t, err := fs.Syncer(c, *syncerTask) + if t != nil { + addedTasks = append(addedTasks, t) + } + if err != nil { + common.ErrorResp(c, err, 500) + return + } + + if len(addedTasks) > 0 { + common.SuccessResp(c, gin.H{ + "message": fmt.Sprintf("Successfully created %d sync task(s)", len(addedTasks)), + "tasks": getTaskInfos(addedTasks), + }) + } else { + common.SuccessResp(c, gin.H{ + "message": "No sync tasks were added", + }) + } +} + +func CancelSyncer(c *gin.Context) { + id := c.Query("id") + if len(id) == 0 { + common.ErrorStrResp(c, "id is required", 400) + } + fs.SyncTaskManager.Cancel(id) + common.SuccessResp(c) +} + +func GetTaskInfo(c *gin.Context) { + id := c.Query("id") + if len(id) == 0 { + allTask := fs.SyncTaskManager.GetAll() + common.SuccessResp(c, allTask) + } else { + syncTaskInfo, _ := fs.SyncTaskManager.GetByID(id) + common.SuccessResp(c, syncTaskInfo) + } +} diff --git a/server/router.go b/server/router.go index 5bc8469db..ff68bad0f 100644 --- a/server/router.go +++ b/server/router.go @@ -92,6 +92,7 @@ func Init(e *gin.Engine) { _fs(auth.Group("/fs")) _task(auth.Group("/task", middlewares.AuthNotGuest)) + _syncer(auth.Group("/syncer", middlewares.AuthNotGuest)) admin(auth.Group("/admin", middlewares.AuthAdmin)) if flags.Debug || flags.Dev { debug(g.Group("/debug")) @@ -210,3 +211,14 @@ func InitS3(e *gin.Engine) { Cors(e) S3Server(e.Group("/")) } + +func _syncer(g *gin.RouterGroup) { + g.GET("/list", handles.ListSyncerTask) + g.GET("/get", handles.GetSyncerTask) + g.POST("/create", handles.CreateSyncerTask) + g.POST("/delete", handles.DeleteSyncerTask) + g.POST("/update", handles.UpdateSyncerTask) + g.POST("/run", handles.RunSyncer) + g.POST("/cancel", handles.CancelSyncer) + g.GET("/taskInfo", handles.GetTaskInfo) +}