Skip to content

Commit fd189cf

Browse files
committed
have added pg support
1 parent ca02cfc commit fd189cf

File tree

14 files changed

+588
-58
lines changed

14 files changed

+588
-58
lines changed

context.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,5 @@ func (c *taskContext) SetState(state any) error {
5454
return err
5555
}
5656

57-
return c.taskEngine.SetState(c.ctx, c.task.ID, b)
57+
return c.taskEngine.SetState(c.ctx, c.task, b)
5858
}

engine.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,21 @@ type Engine interface {
1111
}
1212

1313
type TaskEngine interface {
14-
GetRelatedTask(ctx context.Context, task string, id string) (*Task, error)
14+
GetRelatedTask(ctx context.Context, task *Task) (*Task, error)
1515
FindNextTask(ctx context.Context, statuses []string) (*Task, error)
16-
ReleaseTask(ctx context.Context, id string) error
17-
WaitTaskForSubtasks(ctx context.Context, id string) error
16+
ReleaseTask(ctx context.Context, task *Task) error
17+
WaitTaskForSubtasks(ctx context.Context, task *Task) error
1818
FailTask(ctx context.Context, task *Task, err error, timeout time.Duration) error
1919
CreateTask(ctx context.Context, task *Task) error
20-
GetUnreleasedTaskChildren(ctx context.Context, id string) (*Task, error)
21-
SetState(ctx context.Context, id string, state []byte) error
20+
GetUnreleasedTaskChildren(ctx context.Context, task *Task) (*Task, error)
21+
SetState(ctx context.Context, task *Task, state []byte) error
2222
}
2323

2424
type JobEngine interface {
2525
FindNextJob(ctx context.Context, statuses []string) (*Job, error)
26-
GetUnreleasedJobChildren(ctx context.Context, jobName string) (*Task, error)
27-
Release(ctx context.Context, jobName string, nextLaunchAt time.Time) error
28-
WaitForSubtasks(ctx context.Context, jobName string) error
26+
GetUnreleasedJobChildren(ctx context.Context, name string) (*Task, error)
27+
Release(ctx context.Context, job *Job, nextLaunchAt time.Time) error
28+
WaitForSubtasks(ctx context.Context, job *Job) error
2929
RegisterJob(ctx context.Context, cfg *JobConfig) error
30+
Init(ctx context.Context) error
3031
}

engines/mongo/mongo.go

+19-14
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ func (m *Mongo) CreateTask(ctx context.Context, task *stepper.Task) error {
6262
return err
6363
}
6464

65-
func (m *Mongo) SetState(ctx context.Context, id string, state []byte) error {
66-
query := bson.M{"id": id}
65+
func (m *Mongo) SetState(ctx context.Context, task *stepper.Task, state []byte) error {
66+
query := bson.M{"id": task.ID}
6767
update := bson.M{"$set": bson.M{"state": state}}
6868

6969
if err := m.tasks.FindOneAndUpdate(ctx, query, update).Err(); err != nil {
@@ -164,12 +164,12 @@ func (m *Mongo) GetUnreleasedJobChildren(ctx context.Context, jobId string) (*st
164164
return task.ToModel(), nil
165165
}
166166

167-
func (m *Mongo) GetUnreleasedTaskChildren(ctx context.Context, id string) (*stepper.Task, error) {
167+
func (m *Mongo) GetUnreleasedTaskChildren(ctx context.Context, forTask *stepper.Task) (*stepper.Task, error) {
168168
var task Task
169169

170170
query := bson.M{
171171
"status": bson.M{"$in": []string{"created", "in_progress"}},
172-
"parent": id,
172+
"parent": forTask.ID,
173173
}
174174

175175
if err := m.tasks.FindOne(ctx, query).Decode(&task); err != nil {
@@ -183,8 +183,8 @@ func (m *Mongo) GetUnreleasedTaskChildren(ctx context.Context, id string) (*step
183183
return task.ToModel(), nil
184184
}
185185

186-
func (m *Mongo) GetRelatedTask(ctx context.Context, task string, id string) (*stepper.Task, error) {
187-
query := bson.M{"custom_id": id, "name": task, "status": bson.M{"$ne": "released"}}
186+
func (m *Mongo) GetRelatedTask(ctx context.Context, task *stepper.Task) (*stepper.Task, error) {
187+
query := bson.M{"custom_id": task.ID, "name": task.Name, "status": bson.M{"$ne": "released"}}
188188

189189
var e Task
190190

@@ -199,10 +199,10 @@ func (m *Mongo) GetRelatedTask(ctx context.Context, task string, id string) (*st
199199
return e.ToModel(), nil
200200
}
201201

202-
func (m *Mongo) Release(ctx context.Context, name string, nextTimeLaunch time.Time) error {
202+
func (m *Mongo) Release(ctx context.Context, job *stepper.Job, nextTimeLaunch time.Time) error {
203203
return m.jobs.FindOneAndUpdate(
204204
ctx,
205-
bson.M{"name": name},
205+
bson.M{"name": job.Name},
206206
bson.M{"$set": bson.M{
207207
"lock_at": nil,
208208
"status": "released",
@@ -231,21 +231,21 @@ func (m *Mongo) FailTask(ctx context.Context, task *stepper.Task, handlerErr err
231231
).Err()
232232
}
233233

234-
func (m *Mongo) ReleaseTask(ctx context.Context, id string) error {
234+
func (m *Mongo) ReleaseTask(ctx context.Context, task *stepper.Task) error {
235235
return m.tasks.FindOneAndUpdate(
236236
ctx,
237-
bson.M{"id": id},
237+
bson.M{"id": task.ID},
238238
bson.M{"$set": bson.M{
239239
"lock_at": nil,
240240
"status": "released",
241241
}},
242242
).Err()
243243
}
244244

245-
func (m *Mongo) WaitForSubtasks(ctx context.Context, name string) error {
245+
func (m *Mongo) WaitForSubtasks(ctx context.Context, job *stepper.Job) error {
246246
return m.jobs.FindOneAndUpdate(
247247
ctx,
248-
bson.M{"name": name},
248+
bson.M{"name": job.Name},
249249
bson.M{"$set": bson.M{
250250
"lock_at": nil,
251251
"status": "waiting",
@@ -254,14 +254,19 @@ func (m *Mongo) WaitForSubtasks(ctx context.Context, name string) error {
254254
).Err()
255255
}
256256

257-
func (m *Mongo) WaitTaskForSubtasks(ctx context.Context, id string) error {
257+
func (m *Mongo) WaitTaskForSubtasks(ctx context.Context, task *stepper.Task) error {
258258
return m.tasks.FindOneAndUpdate(
259259
ctx,
260-
bson.M{"id": id},
260+
bson.M{"id": task.ID},
261261
bson.M{"$set": bson.M{
262262
"lock_at": nil,
263263
"status": "waiting",
264264
"launchAt": time.Now().Add(time.Second * 1),
265265
}},
266266
).Err()
267267
}
268+
269+
// TODO add indexes
270+
func (m *Mongo) Init(ctx context.Context) error {
271+
return nil
272+
}

engines/pg/job.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package pg
2+
3+
type Job struct {
4+
Status string `json:"status"`
5+
Name string `json:"name"`
6+
Pattern string `json:"pattern"`
7+
NextLaunchAt int64 `json:"naxtLaunchAt"`
8+
}

0 commit comments

Comments
 (0)