forked from libi/dcron
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdcron.go
201 lines (171 loc) · 4.37 KB
/
dcron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package dcron
import (
"context"
"errors"
"log"
"os"
"sync"
"sync/atomic"
"time"
"github.com/BugKillerPro/dcron/dlog"
"github.com/BugKillerPro/dcron/driver"
"github.com/robfig/cron/v3"
)
const (
defaultReplicas = 50
defaultDuration = 3 * time.Second
)
const (
dcronRunning = 1
dcronStopped = 0
)
type RecoverFuncType func(d *Dcron)
// Dcron is main struct
type Dcron struct {
jobs map[string]*JobWarpper
jobsRWMut sync.Mutex
ServerName string
nodePool INodePool
running int32
logger dlog.Logger
logInfo bool
nodeUpdateDuration time.Duration
hashReplicas int
cr *cron.Cron
crOptions []cron.Option
RecoverFunc RecoverFuncType
}
// NewDcron create a Dcron
func NewDcron(serverName string, driver driver.DriverV2, cronOpts ...cron.Option) *Dcron {
dcron := newDcron(serverName)
dcron.crOptions = cronOpts
dcron.cr = cron.New(cronOpts...)
dcron.running = dcronStopped
dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger)
return dcron
}
// NewDcronWithOption create a Dcron with Dcron Option
func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ...Option) *Dcron {
dcron := newDcron(serverName)
for _, opt := range dcronOpts {
opt(dcron)
}
dcron.cr = cron.New(dcron.crOptions...)
dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger)
return dcron
}
func newDcron(serverName string) *Dcron {
return &Dcron{
ServerName: serverName,
logger: &dlog.StdLogger{
Log: log.New(os.Stdout, "[dcron] ", log.LstdFlags),
},
jobs: make(map[string]*JobWarpper),
crOptions: make([]cron.Option, 0),
nodeUpdateDuration: defaultDuration,
hashReplicas: defaultReplicas,
}
}
// SetLogger set dcron logger
func (d *Dcron) SetLogger(logger dlog.Logger) {
d.logger = logger
}
// GetLogger get dcron logger
func (d *Dcron) GetLogger() dlog.Logger {
return d.logger
}
// AddJob add a job
func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error) {
return d.addJob(jobName, cronStr, nil, job)
}
// AddFunc add a cron func
func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error) {
return d.addJob(jobName, cronStr, cmd, nil)
}
func (d *Dcron) addJob(jobName, cronStr string, cmd func(), job Job) (err error) {
d.logger.Infof("addJob '%s' : %s", jobName, cronStr)
d.jobsRWMut.Lock()
defer d.jobsRWMut.Unlock()
if _, ok := d.jobs[jobName]; ok {
return errors.New("jobName already exist")
}
innerJob := JobWarpper{
Name: jobName,
CronStr: cronStr,
Func: cmd,
Job: job,
Dcron: d,
}
entryID, err := d.cr.AddJob(cronStr, innerJob)
if err != nil {
return err
}
innerJob.ID = entryID
d.jobs[jobName] = &innerJob
return nil
}
// Remove Job
func (d *Dcron) Remove(jobName string) {
d.jobsRWMut.Lock()
defer d.jobsRWMut.Unlock()
if job, ok := d.jobs[jobName]; ok {
delete(d.jobs, jobName)
d.cr.Remove(job.ID)
}
}
func (d *Dcron) allowThisNodeRun(jobName string) bool {
return d.nodePool.CheckJobAvailable(jobName)
}
// Start job
func (d *Dcron) Start() {
// recover jobs before starting
if d.RecoverFunc != nil {
d.RecoverFunc(d)
}
if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
}
d.cr.Start()
d.logger.Infof("dcron started , nodeID is %s", d.nodePool.GetNodeID())
} else {
d.logger.Infof("dcron have started")
}
}
// Run Job
func (d *Dcron) Run() {
// recover jobs before starting
if d.RecoverFunc != nil {
d.RecoverFunc(d)
}
if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
}
d.logger.Infof("dcron running nodeID is %s", d.nodePool.GetNodeID())
d.cr.Run()
} else {
d.logger.Infof("dcron already running")
}
}
func (d *Dcron) startNodePool() error {
if err := d.nodePool.Start(context.Background()); err != nil {
d.logger.Errorf("dcron start node pool error %+v", err)
return err
}
return nil
}
// Stop job
func (d *Dcron) Stop() {
tick := time.NewTicker(time.Millisecond)
d.nodePool.Stop(context.Background())
for range tick.C {
if atomic.CompareAndSwapInt32(&d.running, dcronRunning, dcronStopped) {
d.cr.Stop()
d.logger.Infof("dcron stopped")
return
}
}
}