Skip to content

Commit 6684f29

Browse files
authored
Merge pull request #103 from meshery/nithish/refactor/refine_implementation
[Feat] Make use of the data stored in SharedInformer stores and refactor codebase
2 parents 1c9ac67 + d345520 commit 6684f29

File tree

11 files changed

+202
-43
lines changed

11 files changed

+202
-43
lines changed

helpers/component_info.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"name": "meshsync",
33
"type": "controller",
4-
"next_error_code": 1014
5-
}
4+
"next_error_code": 1015
5+
}

internal/config/types.go

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const (
1414
RequestStream = "request-stream"
1515
LogStream = "log-stream"
1616
ExecShell = "exec-shell"
17+
InformerStore = "informer-store"
1718
)
1819

1920
type PipelineConfigs []PipelineConfig

internal/pipeline/error.go

+5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const (
88
ErrListCode = "1001"
99
ErrPublishCode = "1002"
1010
ErrDynamicClientCode = "1003"
11+
ErrCacheSyncCode = "1014"
1112
)
1213

1314
func ErrDynamicClient(name string, err error) error {
@@ -21,3 +22,7 @@ func ErrList(name string, err error) error {
2122
func ErrPublish(name string, err error) error {
2223
return errors.New(ErrPublishCode, errors.Alert, []string{"Error while publishing for: " + name, err.Error()}, []string{}, []string{}, []string{})
2324
}
25+
26+
func ErrCacheSync(name string, err error) error {
27+
return errors.New(ErrCacheSyncCode, errors.Alert, []string{"Error while syncing the informer store for: " + name, err.Error()}, []string{}, []string{}, []string{})
28+
}

internal/pipeline/handlers.go

+26-14
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@ import (
55
"strconv"
66

77
"github.com/layer5io/meshkit/broker"
8+
internalconfig "github.com/layer5io/meshsync/internal/config"
89
"github.com/layer5io/meshsync/pkg/model"
9-
1010
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1111
"k8s.io/client-go/tools/cache"
1212
)
1313

14-
func (c *ResourceWatcher) startWatching(s cache.SharedIndexInformer) {
14+
func (ri *RegisterInformer) registerHandlers(s cache.SharedIndexInformer) {
1515
handlers := cache.ResourceEventHandlerFuncs{
1616
AddFunc: func(obj interface{}) {
17-
c.log.Info("received add event for:", obj.(*unstructured.Unstructured).GetName())
18-
c.publishItem(obj.(*unstructured.Unstructured), broker.Add)
17+
err := ri.publishItem(obj.(*unstructured.Unstructured), broker.Add, ri.config)
18+
if err != nil {
19+
ri.log.Error(err)
20+
}
21+
ri.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind)
1922
},
2023
UpdateFunc: func(oldObj, obj interface{}) {
2124
oldObjCasted := oldObj.(*unstructured.Unstructured)
@@ -25,12 +28,15 @@ func (c *ResourceWatcher) startWatching(s cache.SharedIndexInformer) {
2528
newRV, _ := strconv.ParseInt(oldObjCasted.GetResourceVersion(), 0, 64)
2629

2730
if oldRV < newRV {
28-
c.log.Info("received update event for:", objCasted.GetName())
31+
err := ri.publishItem(obj.(*unstructured.Unstructured), broker.Update, ri.config)
2932

30-
c.publishItem(objCasted, broker.Update)
33+
if err != nil {
34+
ri.log.Error(err)
35+
}
36+
ri.log.Info("Received UPDATE event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind)
3137
} else {
32-
c.log.Debug(fmt.Sprintf(
33-
"skipping update event for: %s => [No changes detected]: %d %d",
38+
ri.log.Debug(fmt.Sprintf(
39+
"Skipping UPDATE event for: %s => [No changes detected]: %d %d",
3440
objCasted.GetName(),
3541
oldRV,
3642
newRV,
@@ -51,21 +57,27 @@ func (c *ResourceWatcher) startWatching(s cache.SharedIndexInformer) {
5157
if ok {
5258
objCasted = possiblyStaleObj.Obj.(*unstructured.Unstructured)
5359
}
54-
c.log.Info("received delete event for:", objCasted.GetName())
55-
c.publishItem(objCasted, broker.Delete)
60+
err := ri.publishItem(objCasted, broker.Delete, ri.config)
61+
62+
if err != nil {
63+
ri.log.Error(err)
64+
}
65+
ri.log.Info("Received DELETE event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind)
5666
},
5767
}
5868
s.AddEventHandler(handlers)
59-
s.Run(c.stopChan)
6069
}
6170

62-
func (c *ResourceWatcher) publishItem(obj *unstructured.Unstructured, evtype broker.EventType) {
63-
err := c.brokerClient.Publish(c.config.PublishTo, &broker.Message{
71+
func (ri *RegisterInformer) publishItem(obj *unstructured.Unstructured, evtype broker.EventType, config internalconfig.PipelineConfig) error {
72+
err := ri.broker.Publish(config.PublishTo, &broker.Message{
6473
ObjectType: broker.MeshSync,
6574
EventType: evtype,
6675
Object: model.ParseList(*obj),
6776
})
6877
if err != nil {
69-
c.log.Error(ErrPublish(c.config.Name, err))
78+
ri.log.Error(ErrPublish(config.Name, err))
79+
return err
7080
}
81+
82+
return nil
7183
}

internal/pipeline/pipeline.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,38 @@ var (
2121
Concurrent: false,
2222
Steps: []pipeline.Step{},
2323
}
24+
25+
StartInformersStage = &pipeline.Stage{
26+
Name: "StartInformers",
27+
Concurrent: false,
28+
Steps: []pipeline.Step{},
29+
}
2430
)
2531

2632
func New(log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory, broker broker.Handler, plConfigs map[string]internalconfig.PipelineConfigs, stopChan chan struct{}) *pipeline.Pipeline {
2733
// Global discovery
2834
gdstage := GlobalDiscoveryStage
2935
configs := plConfigs[gdstage.Name]
3036
for _, config := range configs {
31-
gdstage.AddStep(addResource(log, informer, broker, config, stopChan))
37+
gdstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources
3238
}
3339

3440
// Local discovery
3541
ldstage := LocalDiscoveryStage
3642
configs = plConfigs[ldstage.Name]
3743
for _, config := range configs {
38-
ldstage.AddStep(addResource(log, informer, broker, config, stopChan))
44+
ldstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources
3945
}
4046

47+
// Start informers
48+
strtInfmrs := StartInformersStage
49+
strtInfmrs.AddStep(newStartInformersStep(stopChan, log, informer)) // Start the registered informers
50+
4151
// Create Pipeline
4252
clusterPipeline := pipeline.New(Name, 1000)
4353
clusterPipeline.AddStage(gdstage)
4454
clusterPipeline.AddStage(ldstage)
55+
clusterPipeline.AddStage(strtInfmrs)
4556

4657
return clusterPipeline
4758
}

internal/pipeline/step.go

+59-21
Original file line numberDiff line numberDiff line change
@@ -8,41 +8,79 @@ import (
88

99
"k8s.io/apimachinery/pkg/runtime/schema"
1010
"k8s.io/client-go/dynamic/dynamicinformer"
11+
"k8s.io/client-go/tools/cache"
1112
)
1213

13-
type ResourceWatcher struct {
14+
type RegisterInformer struct {
1415
pipeline.StepContext
15-
log logger.Handler
16-
informer dynamicinformer.DynamicSharedInformerFactory
17-
brokerClient broker.Handler
18-
config internalconfig.PipelineConfig
19-
stopChan chan struct{}
20-
}
21-
22-
func addResource(log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory, bclient broker.Handler, config internalconfig.PipelineConfig, stopChan chan struct{}) *ResourceWatcher {
23-
return &ResourceWatcher{
24-
log: log,
25-
informer: informer,
26-
brokerClient: bclient,
27-
config: config,
28-
stopChan: stopChan,
16+
log logger.Handler
17+
informer dynamicinformer.DynamicSharedInformerFactory
18+
config internalconfig.PipelineConfig
19+
broker broker.Handler
20+
}
21+
22+
func newRegisterInformerStep(log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory, config internalconfig.PipelineConfig, brkr broker.Handler) *RegisterInformer {
23+
return &RegisterInformer{
24+
log: log,
25+
informer: informer,
26+
config: config,
27+
broker: brkr,
2928
}
3029
}
3130

31+
// TODO: Find a way to respond when an informer has stopped for some reason unknown
3232
// Exec - step interface
33-
func (c *ResourceWatcher) Exec(request *pipeline.Request) *pipeline.Result {
34-
gvr, _ := schema.ParseResourceArg(c.config.Name)
35-
iclient := c.informer.ForResource(*gvr)
33+
func (ri *RegisterInformer) Exec(request *pipeline.Request) *pipeline.Result {
34+
gvr, _ := schema.ParseResourceArg(ri.config.Name)
35+
iclient := ri.informer.ForResource(*gvr)
36+
37+
ri.registerHandlers(iclient.Informer())
38+
39+
// add the instance of store to the Result
40+
data := make(map[string]cache.Store)
41+
if request.Data != nil {
42+
data = request.Data.(map[string]cache.Store)
43+
}
44+
data[ri.config.Name] = iclient.Informer().GetStore()
45+
return &pipeline.Result{
46+
Error: nil,
47+
Data: data,
48+
}
49+
}
3650

37-
go c.startWatching(iclient.Informer())
51+
// Cancel - step interface
52+
func (ri *RegisterInformer) Cancel() error {
53+
ri.Status("cancel step")
54+
return nil
55+
}
56+
57+
// StartInformers Step
58+
59+
type StartInformers struct {
60+
pipeline.StepContext
61+
stopChan chan struct{}
62+
informer dynamicinformer.DynamicSharedInformerFactory
63+
log logger.Handler
64+
}
65+
66+
func newStartInformersStep(stopChan chan struct{}, log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory) *StartInformers {
67+
return &StartInformers{
68+
log: log,
69+
informer: informer,
70+
stopChan: stopChan,
71+
}
72+
}
3873

74+
func (si *StartInformers) Exec(request *pipeline.Request) *pipeline.Result {
75+
si.informer.Start(si.stopChan)
3976
return &pipeline.Result{
4077
Error: nil,
78+
Data: request.Data,
4179
}
4280
}
4381

4482
// Cancel - step interface
45-
func (c *ResourceWatcher) Cancel() error {
46-
c.Status("cancel step")
83+
func (si *StartInformers) Cancel() error {
84+
si.Status("cancel step")
4785
return nil
4886
}

main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func main() {
7070
Username: "",
7171
Password: "",
7272
ReconnectWait: 2 * time.Second,
73-
MaxReconnect: 5,
73+
MaxReconnect: 60,
7474
})
7575
if err != nil {
7676
log.Error(err)

meshsync/discovery.go

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package meshsync
33
import (
44
"github.com/layer5io/meshsync/internal/config"
55
"github.com/layer5io/meshsync/internal/pipeline"
6+
"k8s.io/client-go/tools/cache"
67
)
78

89
func (h *Handler) startDiscovery(pipelineCh chan struct{}) {
@@ -15,6 +16,7 @@ func (h *Handler) startDiscovery(pipelineCh chan struct{}) {
1516
h.Log.Info("Pipeline started")
1617
pl := pipeline.New(h.Log, h.informer, h.Broker, pipelineConfigs, pipelineCh)
1718
result := pl.Run()
19+
h.stores = result.Data.(map[string]cache.Store)
1820
if result.Error != nil {
1921
h.Log.Error(ErrNewPipeline(result.Error))
2022
}

meshsync/exec.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (h *Handler) getActiveChannels() []*string {
7979
return activeChannels
8080
}
8181

82-
func (h *Handler) streamChannelPool() error {
82+
func (h *Handler) streamChannelPool() {
8383
go func() {
8484
for {
8585
err := h.Broker.Publish("active_sessions.exec", &broker.Message{
@@ -93,8 +93,6 @@ func (h *Handler) streamChannelPool() error {
9393
time.Sleep(10 * time.Second)
9494
}
9595
}()
96-
97-
return nil
9896
}
9997

10098
func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.ListenerConfig) {

0 commit comments

Comments
 (0)