Skip to content

Commit 179e643

Browse files
committed
Refactor command parameters initialize
1 parent 20eca93 commit 179e643

File tree

3 files changed

+281
-66
lines changed

3 files changed

+281
-66
lines changed

cmd/csi-attacher/main.go

Lines changed: 38 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/kubernetes-csi/csi-lib-utils/metrics"
4444
"github.com/kubernetes-csi/csi-lib-utils/rpc"
4545
"github.com/kubernetes-csi/external-attacher/pkg/attacher"
46+
cf "github.com/kubernetes-csi/external-attacher/pkg/commandflags"
4647
"github.com/kubernetes-csi/external-attacher/pkg/controller"
4748
"google.golang.org/grpc"
4849
)
@@ -53,82 +54,53 @@ const (
5354
csiTimeout = time.Second
5455
)
5556

56-
// Command line flags
57-
var (
58-
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
59-
resync = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.")
60-
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
61-
showVersion = flag.Bool("version", false, "Show version.")
62-
timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.")
63-
workerThreads = flag.Uint("worker-threads", 10, "Number of attacher worker threads")
64-
maxEntries = flag.Int("max-entries", 0, "Max entries per each page in volume lister call, 0 means no limit.")
65-
66-
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.")
67-
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.")
68-
69-
enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.")
70-
leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
71-
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.")
72-
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.")
73-
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.")
74-
75-
defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to publish. Defaults to empty string")
76-
77-
reconcileSync = flag.Duration("reconcile-sync", 1*time.Minute, "Resync interval of the VolumeAttachment reconciler.")
78-
79-
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
80-
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
81-
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
82-
83-
kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
84-
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")
85-
86-
maxGRPCLogLength = flag.Int("max-grpc-log-length", -1, "The maximum amount of characters logged for every grpc responses. Defaults to no limit")
87-
)
88-
8957
var (
9058
version = "unknown"
9159
)
9260

9361
func main() {
9462
fg := featuregate.NewFeatureGate()
9563
logsapi.AddFeatureGates(fg)
64+
cf.InitCommonFlags()
65+
acf := cf.NewAttacherCommandFlags()
9666
c := logsapi.NewLoggingConfiguration()
9767
logsapi.AddGoFlags(c, flag.CommandLine)
9868
logs.InitLogs()
9969
flag.Parse()
70+
acf.MergeFlags()
10071
logger := klog.Background()
10172
if err := logsapi.ValidateAndApply(c, fg); err != nil {
10273
logger.Error(err, "LoggingConfiguration is invalid")
10374
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
10475
}
10576

106-
if *showVersion {
77+
if cf.ShowVersion {
10778
fmt.Println(os.Args[0], version)
10879
return
10980
}
11081
logger.Info("Version", "version", version)
82+
logger.Info("Timeout", "timeout", cf.Timeout)
11183

112-
if *metricsAddress != "" && *httpEndpoint != "" {
84+
if cf.MetricsAddress != "" && cf.HttpEndpoint != "" {
11385
logger.Error(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set")
11486
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
11587
}
116-
addr := *metricsAddress
88+
addr := cf.MetricsAddress
11789
if addr == "" {
118-
addr = *httpEndpoint
90+
addr = cf.HttpEndpoint
11991
}
12092

12193
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
122-
config, err := buildConfig(*kubeconfig)
94+
config, err := buildConfig(cf.Kubeconfig)
12395
if err != nil {
12496
logger.Error(err, "Failed to build a Kubernetes config")
12597
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
12698
}
127-
config.QPS = (float32)(*kubeAPIQPS)
128-
config.Burst = *kubeAPIBurst
99+
config.QPS = (float32)(cf.KubeAPIQPS)
100+
config.Burst = cf.KubeAPIBurst
129101
config.ContentType = runtime.ContentTypeProtobuf
130102

131-
if *workerThreads == 0 {
103+
if cf.WorkerThreads == 0 {
132104
logger.Error(nil, "Option -worker-threads must be greater than zero")
133105
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
134106
}
@@ -139,20 +111,20 @@ func main() {
139111
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
140112
}
141113

142-
factory := informers.NewSharedInformerFactory(clientset, *resync)
114+
factory := informers.NewSharedInformerFactory(clientset, cf.Resync)
143115
var handler controller.Handler
144116
metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)
145117

146118
// Connect to CSI.
147-
connection.SetMaxGRPCLogLength(*maxGRPCLogLength)
119+
connection.SetMaxGRPCLogLength(cf.MaxGRPCLogLength)
148120
ctx := context.Background()
149-
csiConn, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
121+
csiConn, err := connection.Connect(ctx, cf.CsiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
150122
if err != nil {
151-
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress)
123+
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", cf.CsiAddress)
152124
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
153125
}
154126

155-
err = rpc.ProbeForever(ctx, csiConn, *timeout)
127+
err = rpc.ProbeForever(ctx, csiConn, cf.Timeout)
156128
if err != nil {
157129
logger.Error(err, "Failed to probe the CSI driver")
158130
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
@@ -173,15 +145,15 @@ func main() {
173145
translator := csitrans.New()
174146
if translator.IsMigratedCSIDriverByName(csiAttacher) {
175147
metricsManager = metrics.NewCSIMetricsManagerWithOptions(csiAttacher, metrics.WithMigration())
176-
migratedCsiClient, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
148+
migratedCsiClient, err := connection.Connect(ctx, cf.CsiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
177149
if err != nil {
178-
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress, "migrated", true)
150+
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", cf.CsiAddress, "migrated", true)
179151
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
180152
}
181153
csiConn.Close()
182154
csiConn = migratedCsiClient
183155

184-
err = rpc.ProbeForever(ctx, csiConn, *timeout)
156+
err = rpc.ProbeForever(ctx, csiConn, cf.Timeout)
185157
if err != nil {
186158
logger.Error(err, "Failed to probe the CSI driver", "migrated", true)
187159
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
@@ -191,13 +163,13 @@ func main() {
191163
// Prepare http endpoint for metrics + leader election healthz
192164
mux := http.NewServeMux()
193165
if addr != "" {
194-
metricsManager.RegisterToServer(mux, *metricsPath)
166+
metricsManager.RegisterToServer(mux, cf.MetricsPath)
195167
metricsManager.SetDriverName(csiAttacher)
196168
go func() {
197-
logger.Info("ServeMux listening", "address", addr, "metricsPath", *metricsPath)
169+
logger.Info("ServeMux listening", "address", addr, "metricsPath", cf.MetricsPath)
198170
err := http.ListenAndServe(addr, mux)
199171
if err != nil {
200-
logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", *metricsPath)
172+
logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", cf.MetricsPath)
201173
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
202174
}
203175
}()
@@ -233,7 +205,7 @@ func main() {
233205
vaLister := factory.Storage().V1().VolumeAttachments().Lister()
234206
csiNodeLister := factory.Storage().V1().CSINodes().Lister()
235207
volAttacher := attacher.NewAttacher(csiConn)
236-
CSIVolumeLister := attacher.NewVolumeLister(csiConn, *maxEntries)
208+
CSIVolumeLister := attacher.NewVolumeLister(csiConn, cf.MaxEntries)
237209
handler = controller.NewCSIHandler(
238210
clientset,
239211
csiAttacher,
@@ -242,11 +214,11 @@ func main() {
242214
pvLister,
243215
csiNodeLister,
244216
vaLister,
245-
timeout,
217+
&cf.Timeout,
246218
supportsReadOnly,
247219
supportsSingleNodeMultiWriter,
248220
csitrans.New(),
249-
*defaultFSType,
221+
cf.DefaultFSType,
250222
)
251223
logger.V(2).Info("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
252224
} else {
@@ -266,19 +238,19 @@ func main() {
266238
handler,
267239
factory.Storage().V1().VolumeAttachments(),
268240
factory.Core().V1().PersistentVolumes(),
269-
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
270-
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
241+
workqueue.NewItemExponentialFailureRateLimiter(cf.RetryIntervalStart, cf.RetryIntervalMax),
242+
workqueue.NewItemExponentialFailureRateLimiter(cf.RetryIntervalStart, cf.RetryIntervalMax),
271243
supportsListVolumesPublishedNodes,
272-
*reconcileSync,
244+
cf.ReconcileSync,
273245
)
274246

275247
run := func(ctx context.Context) {
276248
stopCh := ctx.Done()
277249
factory.Start(stopCh)
278-
ctrl.Run(ctx, int(*workerThreads))
250+
ctrl.Run(ctx, int(cf.WorkerThreads))
279251
}
280252

281-
if !*enableLeaderElection {
253+
if !cf.EnableLeaderElection {
282254
run(klog.NewContext(context.Background(), logger))
283255
} else {
284256
// Create a new clientset for leader election. When the attacher
@@ -293,17 +265,17 @@ func main() {
293265
// Name of config map with leader election lock
294266
lockName := "external-attacher-leader-" + csiAttacher
295267
le := leaderelection.NewLeaderElection(leClientset, lockName, run)
296-
if *httpEndpoint != "" {
268+
if cf.HttpEndpoint != "" {
297269
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
298270
}
299271

300-
if *leaderElectionNamespace != "" {
301-
le.WithNamespace(*leaderElectionNamespace)
272+
if cf.LeaderElectionNamespace != "" {
273+
le.WithNamespace(cf.LeaderElectionNamespace)
302274
}
303275

304-
le.WithLeaseDuration(*leaderElectionLeaseDuration)
305-
le.WithRenewDeadline(*leaderElectionRenewDeadline)
306-
le.WithRetryPeriod(*leaderElectionRetryPeriod)
276+
le.WithLeaseDuration(cf.LeaderElectionLeaseDuration)
277+
le.WithRenewDeadline(cf.LeaderElectionRenewDeadline)
278+
le.WithRetryPeriod(cf.LeaderElectionRetryPeriod)
307279

308280
if err := le.Run(); err != nil {
309281
logger.Error(err, "Failed to initialize leader election")

0 commit comments

Comments
 (0)