@@ -25,14 +25,19 @@ import (
25
25
kubeconv1alpha1 "github.com/Huang-Wei/shared-loadbalancer/pkg/apis/kubecon/v1alpha1"
26
26
"golang.org/x/oauth2/google"
27
27
compute "google.golang.org/api/compute/v1"
28
+ "google.golang.org/api/googleapi"
28
29
corev1 "k8s.io/api/core/v1"
29
30
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
31
"k8s.io/apimachinery/pkg/types"
31
32
)
32
33
34
+ // Ref:
35
+ // - gcloud auth application-default login
36
+ // - gcloud auth application-default print-access-token
37
+
33
38
const PORTRANGE = "30000-32767"
34
39
35
- // GKE stands for Google Kubernetes Service
40
+ // GKE stands for Google Kubernetes Engine
36
41
type GKE struct {
37
42
client * compute.Service
38
43
project , region string
@@ -80,44 +85,49 @@ func initGCE() *compute.Service {
80
85
return computeService
81
86
}
82
87
83
- func (i * GKE ) GetCapacityPerLB () int {
84
- return i .capacityPerLB
88
+ func (g * GKE ) GetCapacityPerLB () int {
89
+ return g .capacityPerLB
85
90
}
86
91
87
- func (i * GKE ) UpdateCache (key types.NamespacedName , lbSvc * corev1.Service ) {
92
+ func (g * GKE ) UpdateCache (key types.NamespacedName , lbSvc * corev1.Service ) {
88
93
if lbSvc == nil {
89
- delete (i .cacheMap , key )
94
+ delete (g .cacheMap , key )
90
95
} else {
91
96
if len (lbSvc .Status .LoadBalancer .Ingress ) > 0 {
92
- // Make this IP Static and then add a Forwarding rule
93
- i . createStaticIP (lbSvc .ObjectMeta .Name + "-ip" , lbSvc .Status .LoadBalancer .Ingress [0 ].IP )
94
- // Let us also create a Forwarding rule to open up all ports.
97
+ // Make this IP Static and then add an Inbound Firewall Rule
98
+ g . ensureStaticIP (lbSvc .ObjectMeta .Name + "-ip" , lbSvc .Status .LoadBalancer .Ingress [0 ].IP )
99
+ // Let us also create an Inbound Firewall Rule to open up all ports.
95
100
// We can also do each port the slb needs but for now this is good enough.
96
- fwRuleName := lbSvc .ObjectMeta .Name + "-rule"
97
- lbPool := i .getTargetPool_usingLBName (lbSvc .ObjectMeta .Name )
98
- if lbPool == nil {
99
- log .WithName ("GKE" ).Info ("Cannot find the TargetPool for the LoadBalancer" )
100
- }
101
- i .forwardingRuleInsert (fwRuleName , lbSvc .Status .LoadBalancer .Ingress [0 ].IP , lbPool .SelfLink )
101
+ g .ensureFirewall (getLBFirewallRuleName (lbSvc .ObjectMeta .Name ))
102
102
}
103
- i .cacheMap [key ] = lbSvc
103
+ g .cacheMap [key ] = lbSvc
104
104
}
105
105
}
106
106
107
- func (i * GKE ) NewService (sharedLB * kubeconv1alpha1.SharedLB ) * corev1.Service {
107
+ func getLBFirewallRuleName (lbName string ) string {
108
+ return fmt .Sprintf ("k8s-fw-%s-autogen" , lbName )
109
+ }
110
+
111
+ func getLBForwardRuleName (lbName string , port int32 , proto corev1.Protocol ) string {
112
+ // note: proto must be lowered
113
+ return fmt .Sprintf ("%s-fwd-rule-%d-%v-autogen" , lbName , port , strings .ToLower (string (proto )))
114
+ }
115
+
116
+ func (g * GKE ) NewService (sharedLB * kubeconv1alpha1.SharedLB ) * corev1.Service {
108
117
return & corev1.Service {
109
118
ObjectMeta : metav1.ObjectMeta {
110
119
Name : sharedLB .Name + SvcPostfix ,
111
120
Namespace : sharedLB .Namespace ,
112
121
},
113
122
Spec : corev1.ServiceSpec {
123
+ Type : corev1 .ServiceTypeNodePort ,
114
124
Ports : sharedLB .Spec .Ports ,
115
125
Selector : sharedLB .Spec .Selector ,
116
126
},
117
127
}
118
128
}
119
129
120
- func (i * GKE ) NewLBService () * corev1.Service {
130
+ func (g * GKE ) NewLBService () * corev1.Service {
121
131
return & corev1.Service {
122
132
ObjectMeta : metav1.ObjectMeta {
123
133
Name : "lb-" + RandStringRunes (8 ),
@@ -138,27 +148,24 @@ func (i *GKE) NewLBService() *corev1.Service {
138
148
// Port: 33333,
139
149
// },
140
150
},
141
- // ExternalIPs: []string {
142
- // ipaddress,
143
- // },
144
151
Type : corev1 .ServiceTypeLoadBalancer ,
145
152
},
146
153
}
147
154
}
148
155
149
- func (i * GKE ) GetAvailabelLB (clusterSvc * corev1.Service ) * corev1.Service {
156
+ func (g * GKE ) GetAvailabelLB (clusterSvc * corev1.Service ) * corev1.Service {
150
157
// we leverage the randomness of golang "for range" when iterating
151
158
OUTERLOOP:
152
- for lbKey , lbSvc := range i .cacheMap {
153
- if len (i .lbToCRs [lbKey ]) >= i .capacityPerLB || len (lbSvc .Status .LoadBalancer .Ingress ) == 0 {
159
+ for lbKey , lbSvc := range g .cacheMap {
160
+ if len (g .lbToCRs [lbKey ]) >= g .capacityPerLB || len (lbSvc .Status .LoadBalancer .Ingress ) == 0 {
154
161
continue
155
162
}
156
163
// must satisfy that all svc ports are not occupied in lbSvc
157
164
for _ , svcPort := range clusterSvc .Spec .Ports {
158
- if i .lbToPorts [lbKey ] == nil {
159
- i .lbToPorts [lbKey ] = int32Set {}
165
+ if g .lbToPorts [lbKey ] == nil {
166
+ g .lbToPorts [lbKey ] = int32Set {}
160
167
}
161
- if _ , ok := i .lbToPorts [lbKey ][svcPort.Port ]; ok {
168
+ if _ , ok := g .lbToPorts [lbKey ][svcPort.Port ]; ok {
162
169
log .WithName ("GKE" ).Info (fmt .Sprintf ("incoming service has port conflict with lbSvc %q on port %d" , lbKey , svcPort .Port ))
163
170
continue OUTERLOOP
164
171
}
@@ -168,77 +175,95 @@ OUTERLOOP:
168
175
return nil
169
176
}
170
177
171
- func (i * GKE ) AssociateLB (crName , lbName types.NamespacedName , clusterSvc * corev1.Service ) error {
172
- // Let us also create a Firewall Rule for this port.
173
- firewallName := fmt .Sprintf ("%s-fwrule-%d" , lbName .Name , clusterSvc .Spec .Ports [0 ].Port )
174
- if i .getFirewallRule (firewallName ) == nil {
175
- i .firewallInsert (firewallName , fmt .Sprint (clusterSvc .Spec .Ports [0 ].Port ))
178
+ func (g * GKE ) ensureForwardingRule (lbName , ip string , nodePort int32 , proto corev1.Protocol ) error {
179
+ fwdRuleName := getLBForwardRuleName (lbName , nodePort , proto )
180
+ if g .getForwardingRule (fwdRuleName ) == nil {
181
+ lbPool := g .getTargetPool_usingLBName (lbName )
182
+ if lbPool == nil {
183
+ return errors .New ("cannot find the TargetPool for the LoadBalancer" )
184
+ }
185
+ return g .insertForwardingRule (fwdRuleName , ip , nodePort , proto , lbPool .SelfLink )
176
186
}
187
+ return nil
188
+ }
189
+
190
+ func (g * GKE ) AssociateLB (crName , lbName types.NamespacedName , clusterSvc * corev1.Service ) error {
177
191
if clusterSvc != nil {
178
- if lbSvc , ok := i .cacheMap [lbName ]; ! ok || len (lbSvc .Status .LoadBalancer .Ingress ) == 0 {
192
+ lbSvc , ok := g .cacheMap [lbName ]
193
+ if ! ok || len (lbSvc .Status .LoadBalancer .Ingress ) == 0 {
179
194
return errors .New ("LoadBalancer service not exist yet" )
180
195
}
181
- // upon program starts, i .lbToPorts[lbName] can be nil
182
- if i .lbToPorts [lbName ] == nil {
183
- i .lbToPorts [lbName ] = int32Set {}
196
+ // upon program starts, g .lbToPorts[lbName] can be nil
197
+ if g .lbToPorts [lbName ] == nil {
198
+ g .lbToPorts [lbName ] = int32Set {}
184
199
}
185
200
// update crToPorts
186
201
for _ , svcPort := range clusterSvc .Spec .Ports {
187
- i.lbToPorts [lbName ][svcPort.Port ] = struct {}{}
202
+ // using NodePort as we haven't found the GKE trick to specify targetPort
203
+ g.lbToPorts [lbName ][svcPort.NodePort ] = struct {}{}
204
+ g .ensureForwardingRule (lbName .Name , lbSvc .Status .LoadBalancer .Ingress [0 ].IP , svcPort .NodePort , svcPort .Protocol )
188
205
}
189
206
}
190
207
191
208
// following code might be called multiple times, but shouldn't impact
192
209
// performance a lot as all of them are O(1) operation
193
- _ , ok := i .lbToCRs [lbName ]
210
+ _ , ok := g .lbToCRs [lbName ]
194
211
if ! ok {
195
- i .lbToCRs [lbName ] = make (nameSet )
212
+ g .lbToCRs [lbName ] = make (nameSet )
196
213
}
197
- i .lbToCRs [lbName ][crName ] = struct {}{}
198
- i .crToLB [crName ] = lbName
214
+ g .lbToCRs [lbName ][crName ] = struct {}{}
215
+ g .crToLB [crName ] = lbName
199
216
log .WithName ("GKE" ).Info ("AssociateLB" , "cr" , crName , "lb" , lbName )
200
217
return nil
201
218
}
202
219
203
220
// DeassociateLB is called by GKE finalizer to clean internal cache
204
221
// no IaaS things should be done for GKE
205
- func (i * GKE ) DeassociateLB (crName types.NamespacedName , clusterSvc * corev1.Service ) error {
222
+ func (g * GKE ) DeassociateLB (crName types.NamespacedName , clusterSvc * corev1.Service ) error {
206
223
var lbName string
207
224
// update internal cache
208
- if lb , ok := i .crToLB [crName ]; ok {
225
+ if lb , ok := g .crToLB [crName ]; ok {
209
226
lbName = lb .Name
210
- delete (i .crToLB , crName )
211
- delete (i .lbToCRs [lb ], crName )
227
+ delete (g .crToLB , crName )
228
+ delete (g .lbToCRs [lb ], crName )
212
229
for _ , svcPort := range clusterSvc .Spec .Ports {
213
- delete (i .lbToPorts [lb ], svcPort .Port )
230
+ delete (g .lbToPorts [lb ], svcPort .Port )
214
231
}
215
232
log .WithName ("GKE" ).Info ("DeassociateLB" , "cr" , crName , "lb" , lb )
216
233
}
217
- firewallName := fmt .Sprintf ("%s-fwrule-%d" , lbName , clusterSvc .Spec .Ports [0 ].Port )
218
- i .firewallDelete (firewallName )
234
+ for _ , svcPort := range clusterSvc .Spec .Ports {
235
+ fwdRuleName := getLBForwardRuleName (lbName , svcPort .NodePort , svcPort .Protocol )
236
+ g .deleteForwardingRule (fwdRuleName )
237
+ }
219
238
return nil
220
239
}
221
240
222
- func (i * GKE ) UpdateService (svc , lb * corev1.Service ) (bool , bool ) {
241
+ func (g * GKE ) UpdateService (svc , lb * corev1.Service ) (bool , bool ) {
223
242
lbName := types.NamespacedName {Name : lb .Name , Namespace : lb .Namespace }
224
- occupiedPorts := i .lbToPorts [lbName ]
243
+ occupiedPorts := g .lbToPorts [lbName ]
225
244
if len (occupiedPorts ) == 0 {
226
245
occupiedPorts = int32Set {}
227
246
}
228
247
portUpdated := updatePort (svc , lb , occupiedPorts )
229
248
return portUpdated , true
230
249
}
231
250
232
- func (g * GKE ) createStaticIP (name , ip string ) {
251
+ func isAlreadyExist (err error ) bool {
252
+ apiErr , ok := err .(* googleapi.Error )
253
+ return ok && (apiErr .Code == 409 || strings .Contains (apiErr .Message , "alreadyExists" ))
254
+ }
255
+
256
+ func (g * GKE ) ensureStaticIP (name , ip string ) {
233
257
addr := & compute.Address {
234
258
Name : name ,
235
259
Address : ip ,
236
260
Region : g .region ,
237
261
}
262
+
238
263
addrInsertCall := g .client .Addresses .Insert (g .project , g .region , addr )
239
264
_ , err := addrInsertCall .Do ()
240
265
241
- if err != nil {
266
+ if err != nil && ! isAlreadyExist ( err ) {
242
267
log .WithName ("gke" ).Error (err , "Faied to secure a Static IP" )
243
268
}
244
269
}
@@ -249,6 +274,7 @@ func (g *GKE) getTargetPool_usingLBName(lb string) *compute.TargetPool {
249
274
log .WithName ("gke" ).Error (err , "Failed to find the loadbalancer " + lb )
250
275
}
251
276
for _ , item := range resp .Items {
277
+ // TODO(Huang-Wei): check "name" for exact match
252
278
if strings .Contains (item .Description , lb ) {
253
279
return item
254
280
}
@@ -272,31 +298,34 @@ func (g *GKE) getForwardingRule(name string) *compute.ForwardingRule {
272
298
return nil
273
299
}
274
300
275
- func (g * GKE ) forwardingRuleInsert (name , ip , item string ) {
276
- fw := & compute.ForwardingRule {
301
+ func (g * GKE ) insertForwardingRule (name , ip string , port int32 , proto corev1. Protocol , item string ) error {
302
+ fwd := & compute.ForwardingRule {
277
303
IPAddress : ip ,
278
- IPProtocol : "TCP" ,
279
- Description : "KubeConDemo, forwarding rule to reach service through port " ,
304
+ IPProtocol : strings . ToUpper ( string ( proto )) ,
305
+ Description : "forwarding rule to reach service through nodePort, generated by SharedLB " ,
280
306
LoadBalancingScheme : "EXTERNAL" ,
281
307
Target : item ,
282
308
Name : name ,
283
- PortRange : PORTRANGE ,
309
+ PortRange : fmt . Sprintf ( "%d-%d" , port , port ) ,
284
310
}
285
- fwInsertCall := g .client .ForwardingRules .Insert (g .project , g .region , fw )
286
- _ , err := fwInsertCall .Do ()
311
+ fwdInsertCall := g .client .ForwardingRules .Insert (g .project , g .region , fwd )
312
+ _ , err := fwdInsertCall .Do ()
287
313
288
314
if err != nil {
289
- log .WithName ("gke" ).Error (err , "Failed to create the Forwarding Rule for the item " + item )
315
+ log .WithName ("gke" ).Error (err , "failed to create the forwarding rule" , "target" , item )
316
+ return fmt .Errorf ("failed to create the forwarding rule for the item %v: %v" , item , err )
290
317
}
318
+ return nil
291
319
}
292
320
293
- func (g * GKE ) forwardingRuleDelete (name string ) {
294
- fwDeleteCall := g .client .ForwardingRules .Delete (g .project , g .region , name )
295
- _ , err := fwDeleteCall .Do ()
321
+ func (g * GKE ) deleteForwardingRule (name string ) error {
322
+ fwdDeleteCall := g .client .ForwardingRules .Delete (g .project , g .region , name )
323
+ _ , err := fwdDeleteCall .Do ()
296
324
297
325
if err != nil {
298
- log . WithName ( "gke" ). Error ( err , "Failed to delete the firewall rule " + name )
326
+ return fmt . Errorf ( "failed to delete the forwarding rule %q" , name )
299
327
}
328
+ return nil
300
329
}
301
330
302
331
func (g * GKE ) getFirewallRule (name string ) * compute.Firewall {
@@ -310,20 +339,24 @@ func (g *GKE) getFirewallRule(name string) *compute.Firewall {
310
339
return resp
311
340
}
312
341
313
- func (g * GKE ) firewallInsert (name , port string ) error {
342
+ func (g * GKE ) ensureFirewall (name string ) error {
314
343
fw := & compute.Firewall {
315
344
Name : name ,
316
345
Allowed : []* compute.FirewallAllowed {
317
346
{
318
347
IPProtocol : "TCP" ,
319
- Ports : []string {port },
348
+ Ports : []string {PORTRANGE },
349
+ },
350
+ {
351
+ IPProtocol : "UDP" ,
352
+ Ports : []string {PORTRANGE },
320
353
},
321
354
},
322
355
}
323
356
fwInsertCall := g .client .Firewalls .Insert (g .project , fw )
324
357
_ , err := fwInsertCall .Do ()
325
358
326
- if err != nil {
359
+ if err != nil && ! isAlreadyExist ( err ) {
327
360
log .WithName ("gke" ).Error (err , "Unable to add a firewall rule" )
328
361
}
329
362
return err
0 commit comments