Skip to content
This repository was archived by the owner on Jan 22, 2021. It is now read-only.

Commit 9a93b82

Browse files
committed
Adds option to watch specific namespaces, some refactoring
Watching resources only in some specified namespaces is frequently required in order to provide multi-tenancy or limit the reach of ingress controller for security purposes. So there's now a config option to provide a list of namespaces. In case none are provided it assumes all namespaces must be watched. This also does some refactoring of how resources are watched, making use of API informers for ingresses and pods. It also makes sure caches get synchronised when starting the informers. Since Kubernetes 1.14 ingresses are now part of the "networking" API instead of "extensions". While there's deprecation period, this change introduces support for both depending on the k8s version currently running. Minor fixes and improvements added. Fixes #27.
1 parent 2ded298 commit 9a93b82

File tree

6 files changed

+142
-66
lines changed

6 files changed

+142
-66
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ tyk-k8s
22
tyk-k8s.yaml
33
docker/cfssl-k8s/ca-quickstart/
44
make-docker.sh
5-
push-docker.sh
5+
push-docker.sh
6+
.idea

ca/ca.go

-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ func New(cfg *Config) (*Client, error) {
9191
CA: cfg,
9292
}
9393

94-
log.Info(cfg)
9594
if cfg.CertPath == "" {
9695
return nil, fmt.Errorf("root CA certificate is required for bundling")
9796
}

cmd/start.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ var startCmd = &cobra.Command{
4848
log.Fatal(err)
4949
}
5050

51+
// Ingress controller configuration
52+
ingConf := &ingress.Config{}
53+
if err := viper.UnmarshalKey("Ingress", ingConf); err != nil {
54+
log.Fatalf("couldn't read Ingress config: %v", err)
55+
}
56+
controller := ingress.Controller().Config(ingConf)
57+
5158
whs := &injector.WebhookServer{
5259
SidecarConfig: whConf,
5360
CAConfig: caConf,
@@ -65,9 +72,7 @@ var startCmd = &cobra.Command{
6572
webserver.Server().AddRoute("POST", "/inject", whs.Serve)
6673

6774
// Ingress controller
68-
ingress.NewController()
69-
err = ingress.Controller().Start()
70-
if err != nil {
75+
if err := controller.Start(); err != nil {
7176
log.Fatal(err)
7277
}
7378
log.Info("ingress controller started")
@@ -82,11 +87,9 @@ var startCmd = &cobra.Command{
8287
log.Error(err)
8388
}
8489

85-
err = ingress.Controller().Stop()
86-
if err != nil {
90+
if err := controller.Stop(); err != nil {
8791
log.Error(err)
8892
}
89-
9093
},
9194
}
9295

ingress/ingress.go

+122-54
Original file line numberDiff line numberDiff line change
@@ -16,37 +16,69 @@ import (
1616
"github.com/TykTechnologies/tyk-k8s/tyk"
1717
"k8s.io/api/core/v1"
1818
"k8s.io/api/extensions/v1beta1"
19-
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
20-
"k8s.io/apimachinery/pkg/fields"
19+
netv1beta1 "k8s.io/api/networking/v1beta1"
20+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21+
"k8s.io/apimachinery/pkg/runtime"
22+
"k8s.io/apimachinery/pkg/util/version"
23+
"k8s.io/client-go/informers"
2124
"k8s.io/client-go/kubernetes"
2225
"k8s.io/client-go/rest"
2326
"k8s.io/client-go/tools/cache"
2427
"k8s.io/client-go/tools/clientcmd"
2528
)
2629

27-
type Config struct{}
30+
type Config struct {
31+
WatchNamespaces []string
32+
}
2833

2934
var ctrl *ControlServer
3035
var log = logger.GetLogger("ingress")
3136
var opLog = sync.Map{}
37+
var runtimeScheme = runtime.NewScheme()
3238

3339
const (
3440
IngressAnnotation = "kubernetes.io/ingress.class"
3541
IngressAnnotationValue = "tyk"
42+
defaultResync = 2 * time.Minute
3643
)
3744

3845
type ControlServer struct {
39-
cfg *Config
40-
client *kubernetes.Clientset
41-
store cache.Store
42-
ingressController cache.Controller
43-
podController cache.Controller
44-
stopCh chan struct{}
46+
cfg *Config
47+
client *kubernetes.Clientset
48+
stopCh chan struct{}
49+
factories map[string]informers.SharedInformerFactory
50+
isNetworkingIngress bool
51+
}
52+
53+
func init() {
54+
v1beta1.AddToScheme(runtimeScheme)
55+
netv1beta1.AddToScheme(runtimeScheme)
56+
}
57+
58+
func convertIngress(obj interface{}) (*netv1beta1.Ingress, bool) {
59+
extIngress, ok := obj.(*v1beta1.Ingress)
60+
if ok {
61+
netIngress := &netv1beta1.Ingress{}
62+
if err := runtimeScheme.Convert(extIngress, netIngress, nil); err != nil {
63+
log.Errorf("error converting ingress from extensions/v1beta1: %v", err)
64+
return nil, false
65+
}
66+
67+
return netIngress, true
68+
}
69+
70+
if ing, ok := obj.(*netv1beta1.Ingress); ok {
71+
return ing, true
72+
}
73+
74+
return nil, false
4575
}
4676

4777
func NewController() *ControlServer {
4878
if ctrl == nil {
49-
ctrl = &ControlServer{}
79+
ctrl = &ControlServer{
80+
factories: make(map[string]informers.SharedInformerFactory),
81+
}
5082
}
5183

5284
return ctrl
@@ -60,6 +92,11 @@ func Controller() *ControlServer {
6092
return ctrl
6193
}
6294

95+
func (c *ControlServer) Config(cfg *Config) *ControlServer {
96+
c.cfg = cfg
97+
return c
98+
}
99+
63100
func (c *ControlServer) getClient() (*kubernetes.Clientset, error) {
64101
cfgF := os.Getenv("TYK_K8S_KUBECONF")
65102
var config *rest.Config
@@ -85,10 +122,9 @@ func (c *ControlServer) Start() error {
85122
if err != nil {
86123
return err
87124
}
125+
c.setNetworkingIngress()
88126

89-
c.watchIngresses()
90-
c.watchPods()
91-
return nil
127+
return c.watchAll()
92128
}
93129

94130
func (c *ControlServer) Stop() error {
@@ -110,7 +146,7 @@ func (c *ControlServer) getAPIName(name, service string) string {
110146
return v
111147
}
112148

113-
func (c *ControlServer) generateIngressID(ingressName, ns string, p v1beta1.HTTPIngressPath) string {
149+
func (c *ControlServer) generateIngressID(ingressName, ns string, p netv1beta1.HTTPIngressPath) string {
114150
serviceFQDN := fmt.Sprintf("%s.%s.%s/%s", ingressName, ns, p.Backend.ServiceName, p.Path)
115151
hasher := sha1.New()
116152
hasher.Write([]byte(serviceFQDN))
@@ -119,12 +155,12 @@ func (c *ControlServer) generateIngressID(ingressName, ns string, p v1beta1.HTTP
119155
return sha
120156
}
121157

122-
func (c *ControlServer) handleTLS(ing *v1beta1.Ingress) (map[string]string, error) {
158+
func (c *ControlServer) handleTLS(ing *netv1beta1.Ingress) (map[string]string, error) {
123159
log.Info("checking for TLS entries")
124160
certMap := map[string]string{}
125161
for _, iTLS := range ing.Spec.TLS {
126162
log.Info("found TLS entry: ", iTLS.String())
127-
sec, err := c.client.CoreV1().Secrets(ing.Namespace).Get(iTLS.SecretName, v12.GetOptions{})
163+
sec, err := c.client.CoreV1().Secrets(ing.Namespace).Get(iTLS.SecretName, metav1.GetOptions{})
128164
if err != nil {
129165
return nil, err
130166
}
@@ -156,7 +192,7 @@ func (c *ControlServer) handleTLS(ing *v1beta1.Ingress) (map[string]string, erro
156192

157193
}
158194

159-
func checkAndGetTemplate(ing *v1beta1.Ingress) string {
195+
func checkAndGetTemplate(ing *netv1beta1.Ingress) string {
160196
for k, v := range ing.Annotations {
161197
if k == tyk.TemplateNameKey {
162198
log.Infof("template annotation found with value: %v", v)
@@ -167,7 +203,7 @@ func checkAndGetTemplate(ing *v1beta1.Ingress) string {
167203
return tyk.DefaultIngressTemplate
168204
}
169205

170-
func (c *ControlServer) doAdd(ing *v1beta1.Ingress) error {
206+
func (c *ControlServer) doAdd(ing *netv1beta1.Ingress) error {
171207
tags := []string{"ingress"}
172208
hName := ""
173209

@@ -227,7 +263,7 @@ func (c *ControlServer) doAdd(ing *v1beta1.Ingress) error {
227263
}
228264

229265
func (c *ControlServer) handleIngressAdd(obj interface{}) {
230-
ing, ok := obj.(*v1beta1.Ingress)
266+
ing, ok := convertIngress(obj)
231267
if !ok {
232268
log.Errorf("type not allowed: %v", reflect.TypeOf(obj))
233269
return
@@ -244,7 +280,7 @@ func (c *ControlServer) handleIngressAdd(obj interface{}) {
244280
}
245281

246282
func (c *ControlServer) handleIngressUpdate(oldObj interface{}, newObj interface{}) {
247-
oldIng, ok := oldObj.(*v1beta1.Ingress)
283+
oldIng, ok := convertIngress(oldObj)
248284
if !ok {
249285
log.Errorf("type not allowed: %v", reflect.TypeOf(oldIng))
250286
return
@@ -254,7 +290,7 @@ func (c *ControlServer) handleIngressUpdate(oldObj interface{}, newObj interface
254290
return
255291
}
256292

257-
newIng, ok := newObj.(*v1beta1.Ingress)
293+
newIng, ok := convertIngress(newObj)
258294
if !ok {
259295
log.Errorf("type not allowed: %v", reflect.TypeOf(newIng))
260296
return
@@ -300,7 +336,7 @@ func (c *ControlServer) handleIngressUpdate(oldObj interface{}, newObj interface
300336

301337
}
302338

303-
func (c *ControlServer) ingressChanged(old *v1beta1.Ingress, new *v1beta1.Ingress) bool {
339+
func (c *ControlServer) ingressChanged(old *netv1beta1.Ingress, new *netv1beta1.Ingress) bool {
304340
if len(new.Spec.Rules) > 0 {
305341
r0 := new.Spec.Rules[0]
306342
hName := r0.Host
@@ -322,7 +358,7 @@ func (c *ControlServer) ingressChanged(old *v1beta1.Ingress, new *v1beta1.Ingres
322358

323359
}
324360

325-
func (c *ControlServer) doDelete(oldIng *v1beta1.Ingress) error {
361+
func (c *ControlServer) doDelete(oldIng *netv1beta1.Ingress) error {
326362
for _, r0 := range oldIng.Spec.Rules {
327363
for _, p := range r0.HTTP.Paths {
328364
sid := c.generateIngressID(oldIng.Name, oldIng.Namespace, p)
@@ -339,7 +375,7 @@ func (c *ControlServer) doDelete(oldIng *v1beta1.Ingress) error {
339375
}
340376

341377
func (c *ControlServer) handleIngressDelete(obj interface{}) {
342-
ing, ok := obj.(*v1beta1.Ingress)
378+
ing, ok := convertIngress(obj)
343379
if !ok {
344380
log.Errorf("type not allowed: %v", reflect.TypeOf(obj))
345381
return
@@ -355,7 +391,7 @@ func (c *ControlServer) handleIngressDelete(obj interface{}) {
355391
}
356392
}
357393

358-
func (c *ControlServer) checkIngressManaged(ing *v1beta1.Ingress) bool {
394+
func (c *ControlServer) checkIngressManaged(ing *netv1beta1.Ingress) bool {
359395
for k, v := range ing.Annotations {
360396
if k == IngressAnnotation {
361397
if strings.ToLower(v) == IngressAnnotationValue {
@@ -367,46 +403,78 @@ func (c *ControlServer) checkIngressManaged(ing *v1beta1.Ingress) bool {
367403
return false
368404
}
369405

370-
func (c *ControlServer) watchIngresses() {
371-
log.Info("Watching for ingress activity")
372-
watchList := cache.NewListWatchFromClient(c.client.ExtensionsV1beta1().RESTClient(), "ingresses", v1.NamespaceAll,
373-
fields.Everything())
374-
c.store, c.ingressController = cache.NewInformer(
375-
watchList,
376-
&v1beta1.Ingress{},
377-
time.Second*10,
378-
cache.ResourceEventHandlerFuncs{
406+
// Checks whether k8s version is 1.14+ and therefore uses networking API for ingresses
407+
func (c *ControlServer) setNetworkingIngress() {
408+
version114, err := version.ParseGeneric("v1.14.0")
409+
if err != nil {
410+
log.Errorf("error parsing version: %v", err)
411+
return
412+
}
413+
414+
discoveredVersion, err := c.client.Discovery().ServerVersion()
415+
if err != nil {
416+
log.Errorf("error discovering k8s version: %v", err)
417+
return
418+
}
419+
420+
k8sVersion, err := version.ParseGeneric(discoveredVersion.String())
421+
if err != nil {
422+
log.Errorf("error parsing discovered k8s version: %v", err)
423+
return
424+
}
425+
426+
c.isNetworkingIngress = k8sVersion.AtLeast(version114)
427+
}
428+
429+
// Watches k8s resources required for ingress controller operations using shared informers
430+
func (c *ControlServer) watchAll() error {
431+
namespaces := c.cfg.WatchNamespaces
432+
if len(namespaces) == 0 {
433+
namespaces = []string{v1.NamespaceAll}
434+
}
435+
436+
for _, ns := range namespaces {
437+
log.Infof("Registering informers for namespace %s", ns)
438+
factory := informers.NewSharedInformerFactoryWithOptions(c.client, defaultResync, informers.WithNamespace(ns))
439+
440+
// Watch ingresses
441+
var ingressesInformer cache.SharedIndexInformer
442+
if c.isNetworkingIngress {
443+
ingressesInformer = factory.Networking().V1beta1().Ingresses().Informer()
444+
} else {
445+
ingressesInformer = factory.Extensions().V1beta1().Ingresses().Informer()
446+
log.Info("Using deprecated extensions/v1beta1 ingresses API")
447+
}
448+
ingressesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
379449
AddFunc: c.handleIngressAdd,
380450
UpdateFunc: c.handleIngressUpdate,
381451
DeleteFunc: c.handleIngressDelete,
382-
},
383-
)
452+
})
384453

385-
c.stopCh = make(chan struct{})
386-
go c.ingressController.Run(c.stopCh)
387-
}
388-
389-
func (c *ControlServer) watchPods() {
390-
log.Info("Watching for pod deletion")
391-
watchList := cache.NewListWatchFromClient(c.client.CoreV1().RESTClient(), "pods", v1.NamespaceAll,
392-
fields.Everything())
393-
c.store, c.podController = cache.NewInformer(
394-
watchList,
395-
&v1.Pod{},
396-
time.Second*10,
397-
cache.ResourceEventHandlerFuncs{
454+
// Watch pods
455+
factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
456+
AddFunc: nil,
457+
UpdateFunc: nil,
398458
DeleteFunc: c.handlePodDelete,
399-
},
400-
)
459+
})
460+
461+
c.factories[ns] = factory
462+
factory.Start(c.stopCh)
401463

402-
c.stopCh = make(chan struct{})
403-
go c.podController.Run(c.stopCh)
464+
for t, ok := range factory.WaitForCacheSync(c.stopCh) {
465+
if !ok {
466+
return fmt.Errorf("failed while syncing %s caches for ns %s", t.String(), ns)
467+
}
468+
}
469+
}
470+
471+
return nil
404472
}
405473

406474
func (c *ControlServer) handlePodDeleteForMesh(pd *v1.Pod) {
407475
log.Info("pod is injector-managed")
408476

409-
remPds, err := c.client.CoreV1().Pods(pd.Namespace).List(v12.ListOptions{})
477+
remPds, err := c.client.CoreV1().Pods(pd.Namespace).List(metav1.ListOptions{})
410478
if err != nil {
411479
log.Error(err)
412480
}

sample-tyk-k8s.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ Tyk:
1212
secret: "set-by-env"
1313
org: "set-by-env"
1414

15+
Ingress:
16+
watchNamespaces:
17+
- default
18+
- myapp
19+
1520
# If last-mile TLS is enabled, this section defines the Certificate Authority
1621
# behaviour, you can use the documentation for CFSSL to better understand what
1722
# the options here do as they are a direct map.

webserver/server.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ func (s *WebServer) AddRoute(method string, route string, handler func(http.Resp
4949
func (s *WebServer) Config(cfg *Config) {
5050
if cfg == nil {
5151
log.Info("using default config on port 9797")
52-
s.cfg = &Config{
53-
Addr: ":9797",
54-
}
55-
return
52+
cfg = &Config{}
53+
}
54+
if cfg.Addr == "" {
55+
cfg.Addr = ":9797"
5656
}
5757

5858
s.cfg = cfg

0 commit comments

Comments
 (0)