Skip to content

Commit e409f82

Browse files
committedNov 7, 2022
initial
0 parents  commit e409f82

21 files changed

+1320
-0
lines changed
 

‎context.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package stepper
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"time"
7+
)
8+
9+
type Context interface {
10+
Task() *Task
11+
Context() context.Context
12+
CreateSubtask(sub CreateTask)
13+
BindState(state any) error
14+
SetState(state any) error
15+
SetRetryAfter(timeout time.Duration)
16+
}
17+
18+
type taskContext struct {
19+
ctx context.Context
20+
task *Task
21+
subtasks []CreateTask
22+
retryAfter time.Duration
23+
24+
taskEngine Engine
25+
}
26+
27+
func (c *taskContext) Task() *Task {
28+
return c.task
29+
}
30+
31+
func (c *taskContext) Context() context.Context {
32+
return c.ctx
33+
}
34+
35+
func (c *taskContext) CreateSubtask(sub CreateTask) {
36+
c.subtasks = append(c.subtasks, sub)
37+
}
38+
39+
func (c *taskContext) SetRetryAfter(timeout time.Duration) {
40+
c.retryAfter = timeout
41+
}
42+
43+
func (c *taskContext) BindState(state any) error {
44+
if len(c.task.State) == 0 {
45+
return nil
46+
}
47+
48+
return json.Unmarshal(c.task.State, state)
49+
}
50+
51+
func (c *taskContext) SetState(state any) error {
52+
b, err := json.Marshal(state)
53+
if err != nil {
54+
return err
55+
}
56+
57+
c.taskEngine.SetState(c.ctx, c.task.ID, b)
58+
59+
return nil
60+
}

‎engine.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package stepper
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
type Engine interface {
9+
GetRelatedTask(ctx context.Context, task string, id string) (*Task, error)
10+
FindNextTask(ctx context.Context, statuses []string) (*Task, error)
11+
ReleaseTask(ctx context.Context, id string) error
12+
WaitTaskForSubtasks(ctx context.Context, id string) error
13+
FailTask(ctx context.Context, id string, err error, timeout time.Duration) error
14+
CreateTask(ctx context.Context, task *Task) error
15+
GetUnreleasedTaskChildren(ctx context.Context, id string) (*Task, error)
16+
SetState(ctx context.Context, id string, state []byte) error
17+
}
18+
19+
type JobEngine interface {
20+
FindNextJob(ctx context.Context, statuses []string) (*Job, error)
21+
GetUnreleasedJobChildren(ctx context.Context, jobName string) (*Task, error)
22+
Release(ctx context.Context, jobName string, nextLaunchAt time.Time) error
23+
WaitForSubtasks(ctx context.Context, jobName string) error
24+
RegisterJob(ctx context.Context, cfg *JobConfig) error
25+
}

‎engines/mongo/job.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package mongo
2+
3+
import (
4+
"time"
5+
6+
"github.com/matroskin13/stepper"
7+
)
8+
9+
type job struct {
10+
Status string `bson:"status"`
11+
Name string `bson:"name"`
12+
Pattern string `bson:"pattern"`
13+
NextLaunchAt time.Time `bson:"naxtLaunchAt"`
14+
}
15+
16+
func (j *job) FromModel(model *stepper.Job) {
17+
j.Status = model.Status
18+
j.Name = model.Name
19+
j.Pattern = model.Pattern
20+
j.NextLaunchAt = model.NextLaunchAt
21+
}
22+
23+
func (j *job) ToModel() *stepper.Job {
24+
return &stepper.Job{
25+
Status: j.Status,
26+
Name: j.Name,
27+
Pattern: j.Pattern,
28+
NextLaunchAt: j.NextLaunchAt,
29+
}
30+
}

‎engines/mongo/mongo.go

+247
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package mongo
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/matroskin13/stepper"
8+
9+
"go.mongodb.org/mongo-driver/bson"
10+
"go.mongodb.org/mongo-driver/mongo"
11+
"go.mongodb.org/mongo-driver/mongo/options"
12+
)
13+
14+
type Mongo struct {
15+
jobs *mongo.Collection
16+
tasks *mongo.Collection
17+
}
18+
19+
func NewMongo(db *mongo.Database) *Mongo {
20+
return &Mongo{
21+
jobs: db.Collection("jobs"),
22+
tasks: db.Collection("tasks"),
23+
}
24+
}
25+
26+
func (m *Mongo) RegisterJob(ctx context.Context, cfg *stepper.JobConfig) error {
27+
nextLaunchAt, err := cfg.NextLaunch()
28+
if err != nil {
29+
return err
30+
}
31+
32+
query := bson.M{"name": cfg.Name}
33+
update := bson.M{
34+
"nextLaunchAt": nextLaunchAt,
35+
"name": cfg.Name,
36+
"tags": cfg.Tags,
37+
"pattern": cfg.Pattern,
38+
"status": "created",
39+
}
40+
41+
opts := options.FindOneAndReplace().SetUpsert(true).SetReturnDocument(options.After)
42+
43+
return m.jobs.FindOneAndReplace(ctx, query, update, opts).Err()
44+
}
45+
46+
func (m *Mongo) CreateTask(ctx context.Context, task *stepper.Task) error {
47+
t := Task{}
48+
t.FromModel(task)
49+
_, err := m.tasks.InsertOne(ctx, t)
50+
return err
51+
}
52+
53+
func (m *Mongo) SetState(ctx context.Context, id string, state []byte) error {
54+
query := bson.M{"id": id}
55+
update := bson.M{"$set": bson.M{"state": state}}
56+
57+
if err := m.tasks.FindOneAndUpdate(ctx, query, update).Err(); err != nil {
58+
if err == mongo.ErrNoDocuments {
59+
return nil
60+
}
61+
62+
return err
63+
}
64+
65+
return nil
66+
}
67+
68+
func (m *Mongo) FindNextTask(ctx context.Context, statuses []string) (*stepper.Task, error) {
69+
var job Task
70+
71+
query := bson.M{
72+
"status": bson.M{"$in": statuses},
73+
"launchAt": bson.M{
74+
"$lte": time.Now(),
75+
},
76+
"$or": []bson.M{
77+
{"lock_at": nil},
78+
{"lock_at": bson.M{"$lte": time.Now().Add(time.Minute * -1)}}, // TODO pass right timeout
79+
},
80+
}
81+
82+
update := bson.M{
83+
"$set": bson.M{
84+
"lock_at": time.Now(),
85+
"status": "in_progress",
86+
},
87+
}
88+
89+
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
90+
91+
if err := m.tasks.FindOneAndUpdate(ctx, query, update, opts).Decode(&job); err != nil {
92+
if err == mongo.ErrNoDocuments {
93+
return nil, nil
94+
}
95+
96+
return nil, err
97+
}
98+
99+
return job.ToModel(), nil
100+
}
101+
102+
func (m *Mongo) FindNextJob(ctx context.Context, statuses []string) (*stepper.Job, error) {
103+
var _job job
104+
105+
query := bson.M{
106+
"status": bson.M{"$in": statuses},
107+
"nextLaunchAt": bson.M{
108+
"$lte": time.Now(),
109+
},
110+
"$or": []bson.M{
111+
{"lock_at": nil},
112+
{"lock_at": bson.M{"$lte": time.Now().Add(time.Minute * 5)}}, // TODO pass right timeout
113+
},
114+
}
115+
116+
update := bson.M{
117+
"$set": bson.M{
118+
"lock_at": time.Now(),
119+
"status": "in_progress",
120+
},
121+
}
122+
123+
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
124+
125+
if err := m.jobs.FindOneAndUpdate(ctx, query, update, opts).Decode(&_job); err != nil {
126+
if err == mongo.ErrNoDocuments {
127+
return nil, nil
128+
}
129+
130+
return nil, err
131+
}
132+
133+
return _job.ToModel(), nil
134+
}
135+
136+
func (m *Mongo) GetUnreleasedJobChildren(ctx context.Context, jobId string) (*stepper.Task, error) {
137+
var task Task
138+
139+
query := bson.M{
140+
"status": bson.M{"$in": []string{"created", "in_progress"}},
141+
"jobId": jobId,
142+
}
143+
144+
if err := m.tasks.FindOne(ctx, query).Decode(&task); err != nil {
145+
if err == mongo.ErrNoDocuments {
146+
return nil, nil
147+
}
148+
149+
return nil, err
150+
}
151+
152+
return task.ToModel(), nil
153+
}
154+
155+
func (m *Mongo) GetUnreleasedTaskChildren(ctx context.Context, id string) (*stepper.Task, error) {
156+
var task Task
157+
158+
query := bson.M{
159+
"status": bson.M{"$in": []string{"created", "in_progress"}},
160+
"parent": id,
161+
}
162+
163+
if err := m.tasks.FindOne(ctx, query).Decode(&task); err != nil {
164+
if err == mongo.ErrNoDocuments {
165+
return nil, nil
166+
}
167+
168+
return nil, err
169+
}
170+
171+
return task.ToModel(), nil
172+
}
173+
174+
func (m *Mongo) GetRelatedTask(ctx context.Context, task string, id string) (*stepper.Task, error) {
175+
query := bson.M{"custom_id": id, "name": task, "status": bson.M{"$ne": "released"}}
176+
177+
var e Task
178+
179+
if err := m.tasks.FindOne(ctx, query).Decode(&e); err != nil {
180+
if err == mongo.ErrNoDocuments {
181+
return nil, nil
182+
}
183+
184+
return nil, err
185+
}
186+
187+
return e.ToModel(), nil
188+
}
189+
190+
func (m *Mongo) Release(ctx context.Context, name string, nextTimeLaunch time.Time) error {
191+
return m.jobs.FindOneAndUpdate(
192+
ctx,
193+
bson.M{"name": name},
194+
bson.M{"$set": bson.M{
195+
"lock_at": nil,
196+
"status": "released",
197+
"nextLaunchAt": nextTimeLaunch,
198+
}},
199+
).Err()
200+
}
201+
202+
func (m *Mongo) FailTask(ctx context.Context, id string, handlerErr error, timeout time.Duration) error {
203+
return m.tasks.FindOneAndUpdate(
204+
ctx,
205+
bson.M{"id": id},
206+
bson.M{"$set": bson.M{
207+
"launchAt": time.Now().Add(timeout),
208+
"status": "failed",
209+
"error": handlerErr.Error(),
210+
}},
211+
).Err()
212+
}
213+
214+
func (m *Mongo) ReleaseTask(ctx context.Context, id string) error {
215+
return m.tasks.FindOneAndUpdate(
216+
ctx,
217+
bson.M{"id": id},
218+
bson.M{"$set": bson.M{
219+
"lock_at": nil,
220+
"status": "released",
221+
}},
222+
).Err()
223+
}
224+
225+
func (m *Mongo) WaitForSubtasks(ctx context.Context, name string) error {
226+
return m.jobs.FindOneAndUpdate(
227+
ctx,
228+
bson.M{"name": name},
229+
bson.M{"$set": bson.M{
230+
"lock_at": nil,
231+
"status": "waiting",
232+
"nextLaunchAt": time.Now().Add(time.Second * 5),
233+
}},
234+
).Err()
235+
}
236+
237+
func (m *Mongo) WaitTaskForSubtasks(ctx context.Context, id string) error {
238+
return m.tasks.FindOneAndUpdate(
239+
ctx,
240+
bson.M{"id": id},
241+
bson.M{"$set": bson.M{
242+
"lock_at": nil,
243+
"status": "waiting",
244+
"launchAt": time.Now().Add(time.Second * 5),
245+
}},
246+
).Err()
247+
}

‎engines/mongo/task.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package mongo
2+
3+
import (
4+
"time"
5+
6+
"github.com/matroskin13/stepper"
7+
)
8+
9+
type Task struct {
10+
ID string `bson:"id"`
11+
CustomId string `bson:"custom_id"`
12+
Name string `bson:"name"`
13+
Data []byte `bson:"data"`
14+
JobId string `bson:"jobId"`
15+
Parent string `bson:"parent"`
16+
LaunchAt time.Time `bson:"launchAt"`
17+
Status string `bson:"status"`
18+
LockAt *time.Time `bson:"lock_at"`
19+
State []byte `bson:"state"`
20+
MiddlewaresState map[string][]byte `bson:"middlewares_state"`
21+
}
22+
23+
func (t *Task) FromModel(model *stepper.Task) {
24+
t.ID = model.ID
25+
t.CustomId = model.CustomId
26+
t.Name = model.Name
27+
t.Data = model.Data
28+
t.JobId = model.JobId
29+
t.Parent = model.Parent
30+
t.LaunchAt = model.LaunchAt
31+
t.Status = model.Status
32+
t.LockAt = model.LockAt
33+
t.State = model.State
34+
t.MiddlewaresState = model.MiddlewaresState
35+
}
36+
37+
func (t *Task) ToModel() *stepper.Task {
38+
return &stepper.Task{
39+
ID: t.ID,
40+
Name: t.Name,
41+
Data: t.Data,
42+
JobId: t.JobId,
43+
Parent: t.Parent,
44+
LaunchAt: t.LaunchAt,
45+
Status: t.Status,
46+
LockAt: t.LockAt,
47+
State: t.State,
48+
MiddlewaresState: t.MiddlewaresState,
49+
CustomId: t.CustomId,
50+
}
51+
}

‎examples/cron/main.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
8+
"github.com/matroskin13/stepper"
9+
mongoEngine "github.com/matroskin13/stepper/engines/mongo"
10+
"github.com/matroskin13/stepper/examples"
11+
)
12+
13+
func main() {
14+
db, err := examples.CreateMongoDatabase("stepepr")
15+
if err != nil {
16+
log.Fatal(err)
17+
}
18+
19+
e := mongoEngine.NewMongo(db)
20+
s := stepper.NewService(e, e)
21+
22+
s.RegisterJob(context.Background(), &stepper.JobConfig{
23+
Name: "log-job",
24+
Pattern: "@every 15s",
25+
}, func(ctx stepper.Context) error {
26+
fmt.Println("wake up the log-job")
27+
28+
ctx.CreateSubtask(stepper.CreateTask{
29+
Name: "log-subtask",
30+
Data: []byte("Hello 1 subtask"),
31+
})
32+
33+
ctx.CreateSubtask(stepper.CreateTask{
34+
Name: "log-subtask",
35+
Data: []byte("Hello 2 subtask"),
36+
})
37+
38+
return nil
39+
}).OnFinish(func(ctx stepper.Context, data []byte) error {
40+
fmt.Println("success job log-job")
41+
42+
return nil
43+
})
44+
45+
s.TaskHandler("log-subtask", func(ctx stepper.Context, data []byte) error {
46+
fmt.Println("message from subtask:", string(data))
47+
return nil
48+
})
49+
50+
if err := s.Listen(context.Background()); err != nil {
51+
log.Fatal(err)
52+
}
53+
}

‎examples/db.go

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package examples
2+
3+
import (
4+
"context"
5+
"os"
6+
"time"
7+
8+
"go.mongodb.org/mongo-driver/event"
9+
"go.mongodb.org/mongo-driver/mongo"
10+
"go.mongodb.org/mongo-driver/mongo/options"
11+
)
12+
13+
func CreateMongoDatabase(dbName string) (*mongo.Database, error) {
14+
cmdMonitor := &event.CommandMonitor{
15+
Started: func(_ context.Context, evt *event.CommandStartedEvent) {
16+
// log.Print(evt.Command)
17+
},
18+
}
19+
20+
mongoHost := os.Getenv("MONGO_HOST")
21+
if mongoHost == "" {
22+
mongoHost = "mongodb://localhost:27017"
23+
}
24+
25+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
26+
defer cancel()
27+
mongoClient, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoHost).SetMonitor(cmdMonitor))
28+
if err != nil {
29+
return nil, err
30+
}
31+
32+
db := mongoClient.Database(dbName)
33+
34+
return db, nil
35+
}

‎examples/simple/main.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/matroskin13/stepper"
10+
mongoEngine "github.com/matroskin13/stepper/engines/mongo"
11+
"github.com/matroskin13/stepper/examples"
12+
"github.com/matroskin13/stepper/middlewares"
13+
)
14+
15+
type State struct {
16+
Count int
17+
}
18+
19+
func main() {
20+
db, err := examples.CreateMongoDatabase("stepepr")
21+
if err != nil {
22+
log.Fatal(err)
23+
}
24+
25+
e := mongoEngine.NewMongo(db)
26+
s := stepper.NewService(e, e)
27+
28+
s.UseMiddleware(middlewares.LogMiddleware())
29+
30+
s.TaskHandler("simple", func(ctx stepper.Context, data []byte) error {
31+
fmt.Println(string(data))
32+
33+
return nil
34+
})
35+
36+
for i := 0; i < 10; i++ {
37+
if err := s.Publish(context.Background(), "simple", []byte(fmt.Sprintf("hello from %v", i)), stepper.LaunchAt(time.Now().Add(time.Second*10))); err != nil {
38+
log.Fatal(err)
39+
}
40+
}
41+
42+
if err := s.Listen(context.Background()); err != nil {
43+
log.Fatal(err)
44+
}
45+
}

‎examples/subtasks/main.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"strings"
8+
9+
"github.com/matroskin13/stepper"
10+
mongoEngine "github.com/matroskin13/stepper/engines/mongo"
11+
"github.com/matroskin13/stepper/examples"
12+
)
13+
14+
func main() {
15+
db, err := examples.CreateMongoDatabase("stepepr")
16+
if err != nil {
17+
log.Fatal(err)
18+
}
19+
20+
e := mongoEngine.NewMongo(db)
21+
s := stepper.NewService(e, e)
22+
23+
s.TaskHandler("task-with-subtasks", func(ctx stepper.Context, data []byte) error {
24+
fmt.Println("have received the word for splitting: ", string(data))
25+
26+
for _, symbol := range strings.Split(string(data), "") {
27+
ctx.CreateSubtask(stepper.CreateTask{
28+
Name: "letter-subtask",
29+
Data: []byte(symbol),
30+
})
31+
}
32+
33+
return nil
34+
}).OnFinish(func(ctx stepper.Context, data []byte) error {
35+
fmt.Println("subtasks are over")
36+
return nil
37+
})
38+
39+
s.TaskHandler("letter-subtask", func(ctx stepper.Context, data []byte) error {
40+
fmt.Printf("[letter-subtask]: have received symbol: %s\r\n", data)
41+
return nil
42+
})
43+
44+
if err := s.Publish(context.Background(), "task-with-subtasks", []byte("hello")); err != nil {
45+
log.Fatal(err)
46+
}
47+
48+
if err := s.Listen(context.Background()); err != nil {
49+
log.Fatal(err)
50+
}
51+
}

‎examples/thread/main.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"strings"
8+
9+
"github.com/matroskin13/stepper"
10+
mongoEngine "github.com/matroskin13/stepper/engines/mongo"
11+
"github.com/matroskin13/stepper/examples"
12+
)
13+
14+
func main() {
15+
db, err := examples.CreateMongoDatabase("stepepr")
16+
if err != nil {
17+
log.Fatal(err)
18+
}
19+
20+
e := mongoEngine.NewMongo(db)
21+
s := stepper.NewService(e, e)
22+
23+
s.TaskHandler("task-with-threads", func(ctx stepper.Context, data []byte) error {
24+
fmt.Println("have received the word for splitting: ", string(data))
25+
26+
for _, symbol := range strings.Split(string(data), "") {
27+
ctx.CreateSubtask(stepper.CreateTask{
28+
Data: []byte(symbol),
29+
})
30+
}
31+
32+
return nil
33+
}).Subtask(func(ctx stepper.Context, data []byte) error {
34+
fmt.Printf("[letter-subtask]: have received symbol: %s\r\n", data)
35+
return nil
36+
}).OnFinish(func(ctx stepper.Context, data []byte) error {
37+
fmt.Println("subtasks are over")
38+
return nil
39+
})
40+
41+
if err := s.Publish(context.Background(), "task-with-threads", []byte("hello")); err != nil {
42+
log.Fatal(err)
43+
}
44+
45+
if err := s.Listen(context.Background()); err != nil {
46+
log.Fatal(err)
47+
}
48+
}

‎go.mod

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
module github.com/matroskin13/stepper
2+
3+
go 1.18
4+
5+
require (
6+
github.com/robfig/cron/v3 v3.0.1
7+
github.com/samber/lo v1.33.0
8+
go.mongodb.org/mongo-driver v1.11.0
9+
golang.org/x/sync v0.1.0
10+
)
11+
12+
require (
13+
github.com/golang/snappy v0.0.1 // indirect
14+
github.com/klauspost/compress v1.13.6 // indirect
15+
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
16+
github.com/pkg/errors v0.9.1 // indirect
17+
github.com/rs/xid v1.4.0 // indirect
18+
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
19+
github.com/xdg-go/scram v1.1.1 // indirect
20+
github.com/xdg-go/stringprep v1.0.3 // indirect
21+
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
22+
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
23+
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
24+
golang.org/x/text v0.3.7 // indirect
25+
)

‎go.sum

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
5+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
6+
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
7+
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
8+
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
9+
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
10+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
11+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
12+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
13+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
14+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
15+
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
16+
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
17+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
18+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
19+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
20+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
21+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
22+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
23+
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
24+
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
25+
github.com/samber/lo v1.33.0 h1:2aKucr+rQV6gHpY3bpeZu69uYoQOzVhGT3J22Op6Cjk=
26+
github.com/samber/lo v1.33.0/go.mod h1:HLeWcJRRyLKp3+/XBJvOrerCQn9mhdKMHyd7IRlgeQ8=
27+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
28+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
29+
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
30+
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
31+
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
32+
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
33+
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
34+
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
35+
github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
36+
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
37+
github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
38+
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
39+
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
40+
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
41+
go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE=
42+
go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
43+
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
44+
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
45+
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
46+
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
47+
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
48+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
49+
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
50+
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
51+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
52+
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
53+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
54+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
55+
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
56+
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
57+
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
58+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
59+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
60+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
61+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
62+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
63+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
64+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
65+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

‎job.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package stepper
2+
3+
import (
4+
"time"
5+
6+
"github.com/robfig/cron/v3"
7+
)
8+
9+
type Job struct {
10+
Status string `json:"status"`
11+
Name string `json:"name"`
12+
Pattern string `json:"pattern"`
13+
NextLaunchAt time.Time `json:"naxtLaunchAt"`
14+
}
15+
16+
func (j *Job) CalculateNextLaunch() error {
17+
specParser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
18+
19+
schedule, err := specParser.Parse(j.Pattern)
20+
if err != nil {
21+
return err
22+
}
23+
24+
j.NextLaunchAt = schedule.Next(time.Now())
25+
26+
return nil
27+
}
28+
29+
type JobConfig struct {
30+
Tags []string
31+
Name string
32+
Pattern string
33+
}
34+
35+
func (c *JobConfig) NextLaunch() (time.Time, error) {
36+
specParser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
37+
38+
schedule, err := specParser.Parse(c.Pattern)
39+
if err != nil {
40+
return time.Now(), err
41+
}
42+
43+
return schedule.Next(time.Now()), nil
44+
}

‎middleware.go

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package stepper
2+
3+
type MiddlewareFunc func(ctx Context, t *Task) error
4+
type MiddlewareHandler func(t MiddlewareFunc) MiddlewareFunc

‎middlewares/log.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package middlewares
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/matroskin13/stepper"
7+
)
8+
9+
func LogMiddleware() stepper.MiddlewareHandler {
10+
return func(next stepper.MiddlewareFunc) stepper.MiddlewareFunc {
11+
return func(ctx stepper.Context, t *stepper.Task) error {
12+
fmt.Printf("take task=%s with body=%s\r\n", t.Name, string(t.Data))
13+
err := next(ctx, t)
14+
fmt.Printf("result for task=%s: %v\r\n", t.Name, err)
15+
16+
return err
17+
}
18+
}
19+
}

‎pool.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package stepper
2+
3+
import (
4+
"context"
5+
)
6+
7+
func Pool[T any](ctx context.Context, count int, consumer func(*T)) chan *T {
8+
ch := make(chan *T)
9+
10+
for i := 0; i < count; i++ {
11+
go func() {
12+
for item := range ch {
13+
consumer(item)
14+
}
15+
}()
16+
}
17+
18+
return ch
19+
}

‎publish.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package stepper
2+
3+
import "time"
4+
5+
type PublishOption func(c *CreateTask)
6+
7+
func SetDelay(d time.Duration) PublishOption {
8+
return func(c *CreateTask) {
9+
c.LaunchAfter = d
10+
}
11+
}
12+
13+
func LaunchAt(t time.Time) PublishOption {
14+
return func(c *CreateTask) {
15+
c.LaunchAt = t
16+
}
17+
}

‎service.go

+421
Large diffs are not rendered by default.

‎stepper.go

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package stepper
2+
3+
import "context"
4+
5+
type Stepper interface {
6+
TaskHandler(name string, handler Handler) HandlerStruct
7+
Listen(ctx context.Context) error
8+
Publish(ctx context.Context, name string, data []byte, options ...PublishOption) error
9+
RegisterJob(ctx context.Context, config *JobConfig, h JobHandler) HandlerStruct
10+
UseMiddleware(h MiddlewareHandler)
11+
}

‎tasks.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package stepper
2+
3+
import (
4+
"time"
5+
)
6+
7+
type Task struct {
8+
ID string `json:"_id"`
9+
CustomId string `bson:"custom_id"`
10+
Name string `json:"name"`
11+
Data []byte `json:"data"`
12+
JobId string `json:"jobId"`
13+
Parent string `json:"parent"`
14+
LaunchAt time.Time `json:"launchAt"`
15+
Status string `json:"status"`
16+
LockAt *time.Time `json:"lock_at"`
17+
State []byte `json:"state"`
18+
MiddlewaresState map[string][]byte `json:"middlewares_state"`
19+
}
20+
21+
func (t *Task) IsWaiting() bool {
22+
return t.Status == "waiting"
23+
}
24+
25+
type CreateTask struct {
26+
Name string
27+
Data []byte
28+
CustomId string
29+
LaunchAfter time.Duration
30+
LaunchAt time.Time
31+
}

‎utils.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package stepper
2+
3+
func Or[T comparable](first, second T) T {
4+
var zero T
5+
6+
if first == zero {
7+
return second
8+
}
9+
10+
return first
11+
}
12+
13+
func Apply[T any](initial *T, callbacks []func(*T)) *T {
14+
for _, callback := range callbacks {
15+
callback(initial)
16+
}
17+
18+
return initial
19+
}

0 commit comments

Comments
 (0)
Please sign in to comment.