-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathcontroller.go
116 lines (95 loc) · 2.67 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package tasks
import (
"context"
"sync/atomic"
tasks_config "github.com/ydb-platform/nbs/cloud/tasks/config"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
"github.com/ydb-platform/nbs/cloud/tasks/metrics"
"github.com/ydb-platform/nbs/cloud/tasks/storage"
)
////////////////////////////////////////////////////////////////////////////////
// Controller starts/stops task runners.
type Controller interface {
StartRunners() error
HealthChangedCallback(status bool)
}
////////////////////////////////////////////////////////////////////////////////
type controller struct {
runContext context.Context
runnersCancel context.CancelFunc
running atomic.Bool
taskStorage storage.Storage
registry *Registry
runnerMetricsRegistry metrics.Registry
config *tasks_config.TasksConfig
host string
}
////////////////////////////////////////////////////////////////////////////////
func (c *controller) StartRunners() error {
return c.startRunners(c.runContext)
}
func (c *controller) HealthChangedCallback(status bool) {
if !c.config.GetNodeEvictionEnabled() {
return
}
// Evict node.
if !status {
c.stopRunners()
logging.Debug(c.runContext, "Stopped runners due to health changed")
}
err := c.startRunners(c.runContext)
if err != nil {
logging.Error(c.runContext, "Could not restart runners, reason: %v", err)
} else {
logging.Debug(c.runContext, "Restarted runners")
}
}
////////////////////////////////////////////////////////////////////////////////
func (c *controller) startRunners(ctx context.Context) error {
if c.running.Load() {
return nil
}
ctx, cancel := context.WithCancel(ctx)
_ = cancel // hack to prevent lostcancel linter warning messages.
err := StartRunners(
ctx,
c.taskStorage,
c.registry,
c.runnerMetricsRegistry,
c.config,
c.host,
)
if err != nil {
return err
}
c.runnersCancel = cancel
c.running.Store(true)
return nil
}
func (c *controller) stopRunners() {
if !c.running.Load() {
return
}
c.runnersCancel()
c.running.Store(false)
}
////////////////////////////////////////////////////////////////////////////////
func NewController(
ctx context.Context,
taskStorage storage.Storage,
registry *Registry,
runnerMetricsRegistry metrics.Registry,
config *tasks_config.TasksConfig,
host string,
) Controller {
runContext := logging.WithComponent(ctx, logging.ComponentTaskRunner)
return &controller{
runContext: runContext,
running: atomic.Bool{},
taskStorage: taskStorage,
registry: registry,
runnerMetricsRegistry: runnerMetricsRegistry,
config: config,
host: host,
}
}