forked from open-cluster-management-io/clusteradm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexec.go
296 lines (270 loc) · 10 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
// Copyright Contributors to the Open Cluster Management project
package accept
import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
"strings"
"time"
"github.com/spf13/cobra"
certificatesv1 "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
"open-cluster-management.io/clusteradm/pkg/helpers"
)
const (
groupNameBootstrap = "system:bootstrappers:managedcluster"
userNameSignatureBootstrapPrefix = "system:bootstrap:"
userNameSignatureSA = "system:serviceaccount:open-cluster-management:agent-registration-bootstrap"
groupNameSA = "system:serviceaccounts:open-cluster-management"
clusterLabel = "open-cluster-management.io/cluster-name"
)
func (o *Options) complete(cmd *cobra.Command, args []string) (err error) {
o.Values.Clusters = o.ClusterOptions.AllClusters().UnsortedList()
klog.V(1).InfoS("accept options:", "dry-run", o.ClusteradmFlags.DryRun, "clusters", o.Values.Clusters, "wait", o.Wait)
return nil
}
func (o *Options) Validate() error {
if err := o.ClusteradmFlags.ValidateHub(); err != nil {
return err
}
if err := o.ClusterOptions.Validate(); err != nil {
return err
}
return nil
}
func (o *Options) Run() error {
kubeClient, err := o.ClusteradmFlags.KubectlFactory.KubernetesClientSet()
if err != nil {
return err
}
restConfig, err := o.ClusteradmFlags.KubectlFactory.ToRESTConfig()
if err != nil {
return err
}
clusterClient, err := clusterclientset.NewForConfig(restConfig)
if err != nil {
return err
}
return o.runWithClient(kubeClient, clusterClient)
}
func (o *Options) runWithClient(kubeClient *kubernetes.Clientset, clusterClient *clusterclientset.Clientset) error {
var errs []error
for _, clusterName := range o.Values.Clusters {
if !o.Wait {
approved, err := o.accept(kubeClient, clusterClient, clusterName, false)
if err != nil {
errs = append(errs, err)
}
if !approved {
errs = append(errs, fmt.Errorf("no csr is approved yet for cluster %s", clusterName))
}
} else {
err := wait.PollUntilContextTimeout(context.TODO(), 1*time.Second, time.Duration(o.ClusteradmFlags.Timeout)*time.Second, true, func(ctx context.Context) (bool, error) {
approved, err := o.accept(kubeClient, clusterClient, clusterName, true)
if !approved {
return false, nil
}
if errors.IsNotFound(err) {
return false, nil
}
return true, err
})
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}
func (o *Options) accept(kubeClient *kubernetes.Clientset, clusterClient *clusterclientset.Clientset, clusterName string, waitMode bool) (bool, error) {
managedCluster, err := clusterClient.ClusterV1().ManagedClusters().Get(context.TODO(),
clusterName,
metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("fail to get managedcluster %s: %v", clusterName, err)
}
// when a managed cluster registers with hub using awsirsa registration-auth, it will add this annotation
// to ManagedCluster resource, presense of which is used to decide the requested authentication type.
// awrirsa authentication doesn't create CSR on hub, hence there is nothing to approve
_, hasEksArn := managedCluster.Annotations["agent.open-cluster-management.io/managed-cluster-arn"]
var approved bool
if !hasEksArn {
approved, err = o.approveCSR(kubeClient, clusterName, waitMode)
if err != nil {
return approved, fmt.Errorf("fail to approve the csr for cluster %s: %v", clusterName, err)
}
} else {
approved = true
}
err = o.updateManagedCluster(clusterClient, clusterName)
if err != nil {
return approved, err
}
fmt.Fprintf(o.Streams.Out, "\n Your managed cluster %s has joined the Hub successfully. Visit https://open-cluster-management.io/scenarios or https://github.com/open-cluster-management-io/OCM/tree/main/solutions for next steps.\n", clusterName)
return approved, nil
}
func (o *Options) approveCSR(kubeClient *kubernetes.Clientset, clusterName string, waitMode bool) (bool, error) {
var hasApproved bool
csrs, err := kubeClient.CertificatesV1().CertificateSigningRequests().List(context.TODO(),
metav1.ListOptions{
LabelSelector: fmt.Sprintf("%v = %v", clusterLabel, clusterName),
})
if err != nil {
return hasApproved, err
}
// Check if csr has the correct requester
var passedCSRs []certificatesv1.CertificateSigningRequest
csrRequesterMapper := map[string]string{}
requesters := sets.New[string]()
if o.SkipApproveCheck {
passedCSRs = csrs.Items
} else {
for _, item := range csrs.Items {
// Does not have the correct name prefix
if !strings.HasPrefix(item.Spec.Username, userNameSignatureBootstrapPrefix) &&
!strings.HasPrefix(item.Spec.Username, userNameSignatureSA) {
continue
}
// Check groups
groups := sets.NewString(item.Spec.Groups...)
if !groups.Has(groupNameBootstrap) &&
!groups.Has(groupNameSA) {
continue
}
passedCSRs = append(passedCSRs, item)
// parse the common name in the request
cn, err := parseCSRCommonName(item.Spec.Request)
if err != nil {
fmt.Fprintf(o.Streams.ErrOut, "csr %s is not valid: %v", item.Name, err)
continue
}
requesters.Insert(cn)
csrRequesterMapper[item.Name] = cn
}
}
// if there are multiple csr with different common name, it is possible that multiple agents is registered with the
// same cluster name. We should stop here and let user specify a certain requester or enable skip-approve-check.
requiredRequesters := sets.New[string](o.Requesters...)
if len(requesters) > 1 {
if requiredRequesters.Len() == 0 || !o.SkipApproveCheck {
fmt.Fprintf(o.Streams.Out, "There are CSRs of different requesters: %s, approve is skipped "+
"please specify the certain requesters with --requesters or set --skip-approve-check if "+
"all CSRs need to be approved", strings.Join(requesters.UnsortedList(), ","))
return false, nil
}
} else {
// always approve if there is only one requester
requiredRequesters = requiredRequesters.Union(requesters)
}
filteredRequesters := requesters.Intersection(requiredRequesters)
// approve all csrs that are not approved.
var csrToApprove []certificatesv1.CertificateSigningRequest
for _, passedCSR := range passedCSRs {
cn := csrRequesterMapper[passedCSR.Name]
if !o.SkipApproveCheck && !filteredRequesters.Has(cn) {
fmt.Fprintf(o.Streams.Out, "CSR %s with requester %s is not in the approve list\n", passedCSR.Name, cn)
continue
}
// Check if already approved or denied
approved, denied := GetCertApprovalCondition(&passedCSR.Status)
// if already denied, then nothing to do
if denied {
fmt.Fprintf(o.Streams.Out, "CSR %s already denied\n", passedCSR.Name)
continue
}
// if already approved, then nothing to do
if approved {
fmt.Fprintf(o.Streams.Out, "CSR %s already approved\n", passedCSR.Name)
hasApproved = true
continue
}
csrToApprove = append(csrToApprove, passedCSR)
}
// no csr found
if len(csrToApprove) == 0 {
if waitMode {
fmt.Fprintf(o.Streams.Out, "no CSR to approve for cluster %s\n", clusterName)
}
return hasApproved, nil
}
// if dry-run don't approve
if o.ClusteradmFlags.DryRun {
return hasApproved, nil
}
var errs []error
fmt.Fprintf(o.Streams.Out, "Starting approve csrs for the cluster %s\n", clusterName)
for _, csr := range csrToApprove {
if csr.Status.Conditions == nil {
csr.Status.Conditions = make([]certificatesv1.CertificateSigningRequestCondition, 0)
}
csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1.CertificateSigningRequestCondition{
Status: corev1.ConditionTrue,
Type: certificatesv1.CertificateApproved,
Reason: fmt.Sprintf("%s Approve", helpers.GetExampleHeader()),
Message: fmt.Sprintf("This CSR was approved by %s certificate approve.", helpers.GetExampleHeader()),
LastUpdateTime: metav1.Now(),
})
signingRequest := kubeClient.CertificatesV1().CertificateSigningRequests()
if _, err := signingRequest.UpdateApproval(context.TODO(), csr.Name, &csr, metav1.UpdateOptions{}); err != nil {
errs = append(errs, err)
} else {
fmt.Fprintf(o.Streams.Out, "CSR %s approved\n", csr.Name)
hasApproved = true
}
}
return hasApproved, utilerrors.NewAggregate(errs)
}
func (o *Options) updateManagedCluster(clusterClient *clusterclientset.Clientset, clusterName string) error {
mc, err := clusterClient.ClusterV1().ManagedClusters().Get(context.TODO(),
clusterName,
metav1.GetOptions{})
if err != nil {
return err
}
if mc.Spec.HubAcceptsClient {
fmt.Fprintf(o.Streams.Out, "hubAcceptsClient already set for managed cluster %s\n", clusterName)
return nil
}
if o.ClusteradmFlags.DryRun {
return nil
}
if !mc.Spec.HubAcceptsClient {
patch := `{"spec":{"hubAcceptsClient":true}}`
_, err = clusterClient.ClusterV1().ManagedClusters().Patch(context.TODO(), mc.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
return err
}
fmt.Fprintf(o.Streams.Out, "set hubAcceptsClient to true for managed cluster %s\n", clusterName)
}
return nil
}
func GetCertApprovalCondition(status *certificatesv1.CertificateSigningRequestStatus) (approved bool, denied bool) {
for _, c := range status.Conditions {
if c.Type == certificatesv1.CertificateApproved {
approved = true
}
if c.Type == certificatesv1.CertificateDenied {
denied = true
}
}
return
}
func parseCSRCommonName(csr []byte) (string, error) {
block, _ := pem.Decode(csr)
if block == nil || block.Type != "CERTIFICATE REQUEST" {
return "", fmt.Errorf("CSR was not recognized: PEM block type is not CERTIFICATE REQUEST")
}
x509cr, err := x509.ParseCertificateRequest(block.Bytes)
if err != nil {
return "", fmt.Errorf("CSR was not recognized: %v", err)
}
return x509cr.Subject.CommonName, nil
}