Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 28 additions & 69 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,38 +79,22 @@ import (

var (
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.")
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
showVersion = flag.Bool("version", false, "Show version.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal")
capacityThreads = flag.Uint("capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects")
operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for volume operation (creation, deletion, capacity queries)")

enableLeaderElection = flag.Bool("leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")

leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
strictTopology = flag.Bool("strict-topology", false, "Late binding: pass only selected node topology to CreateVolume Request, unlike default behavior of passing aggregated cluster topologies that match with topology keys of the selected node.")
immediateTopology = flag.Bool("immediate-topology", true, "Immediate binding: pass aggregated cluster topologies for all nodes where the CSI driver is available (enabled, the default) or no topology requirements (if disabled).")
extraCreateMetadata = flag.Bool("extra-create-metadata", false, "If set, add pv/pvc metadata to plugin create requests as parameters.")
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including pprof, metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
enableProfile = flag.Bool("enable-pprof", false, "Enable pprof profiling on the TCP network address specified by --http-endpoint. The HTTP path is `/debug/pprof/`.")

leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.")
strictTopology = flag.Bool("strict-topology", false, "Late binding: pass only selected node topology to CreateVolume Request, unlike default behavior of passing aggregated cluster topologies that match with topology keys of the selected node.")
immediateTopology = flag.Bool("immediate-topology", true, "Immediate binding: pass aggregated cluster topologies for all nodes where the CSI driver is available (enabled, the default) or no topology requirements (if disabled).")
extraCreateMetadata = flag.Bool("extra-create-metadata", false, "If set, add pv/pvc metadata to plugin create requests as parameters.")
enableProfile = flag.Bool("enable-pprof", false, "Enable pprof profiling on the TCP network address specified by --http-endpoint. The HTTP path is `/debug/pprof/`.")

defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to provision when fstype is unspecified in the StorageClass. If the default is not set and fstype is unset in the StorageClass, then no fstype will be set")

kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")

kubeAPICapacityQPS = flag.Float32("kube-api-capacity-qps", 1, "QPS to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 1.0.")
kubeAPICapacityBurst = flag.Int("kube-api-capacity-burst", 5, "Burst to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 5.")

Expand Down Expand Up @@ -144,6 +128,7 @@ func main() {
c := logsapi.NewLoggingConfiguration()
logsapi.AddFlags(c, flag.CommandLine)
logs.InitLogs()
standardflags.RegisterCommonFlags(goflag.CommandLine)
standardflags.AddAutomaxprocs(klog.Infof)
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
flag.Parse()
Expand All @@ -163,32 +148,32 @@ func main() {
klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")
}

if *showVersion {
if standardflags.Configuration.ShowVersion {
fmt.Println(os.Args[0], version)
os.Exit(0)
}
klog.InfoS("Version", "version", version)

if *metricsAddress != "" && *httpEndpoint != "" {
if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" {
klog.Error("only one of `--metrics-address` and `--http-endpoint` can be set.")
os.Exit(1)
}
addr := *metricsAddress
addr := standardflags.Configuration.MetricsAddress
if addr == "" {
addr = *httpEndpoint
addr = standardflags.Configuration.HttpEndpoint
}

// get the KUBECONFIG from env if specified (useful for local/debug cluster)
kubeconfigEnv := os.Getenv("KUBECONFIG")

if kubeconfigEnv != "" {
klog.Infof("Found KUBECONFIG environment variable set, using that..")
kubeconfig = &kubeconfigEnv
standardflags.Configuration.KubeConfig = kubeconfigEnv
}

if *master != "" || *kubeconfig != "" {
if *master != "" || standardflags.Configuration.KubeConfig != "" {
klog.Infof("Either master or kubeconfig specified. building kube config from that..")
config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
config, err = clientcmd.BuildConfigFromFlags(*master, standardflags.Configuration.KubeConfig)
} else {
klog.Infof("Building kube configs for running in cluster...")
config, err = rest.InClusterConfig()
Expand All @@ -197,8 +182,8 @@ func main() {
klog.Fatalf("Failed to create config: %v", err)
}

config.QPS = *kubeAPIQPS
config.Burst = *kubeAPIBurst
config.QPS = float32(standardflags.Configuration.KubeAPIQPS)
config.Burst = standardflags.Configuration.KubeAPIBurst

coreConfig := rest.CopyConfig(config)
coreConfig.ContentType = runtime.ContentTypeProtobuf
Expand Down Expand Up @@ -228,7 +213,7 @@ func main() {
metrics.WithSubsystem(metrics.SubsystemSidecar),
)

grpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)
grpcClient, err := ctrl.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
Expand Down Expand Up @@ -262,7 +247,7 @@ func main() {
// Will be provided via default gatherer.
metrics.WithProcessStartTime(false),
metrics.WithMigration())
migratedGrpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)
migratedGrpcClient, err := ctrl.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
Expand Down Expand Up @@ -601,14 +586,14 @@ func main() {
// because both CSI metrics manager and component-base manage
// their own registry. Probably could be avoided by making
// CSI metrics manager a bit more flexible.
mux.Handle(*metricsPath,
mux.Handle(standardflags.Configuration.MetricsPath,
promhttp.InstrumentMetricHandler(
reg,
promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{}),
))

if *enableProfile {
klog.InfoS("Starting profiling", "endpoint", httpEndpoint)
klog.InfoS("Starting profiling", "endpoint", standardflags.Configuration.HttpEndpoint)

mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand All @@ -620,7 +605,7 @@ func main() {
klog.Infof("ServeMux listening at %q", addr)
err := http.ListenAndServe(addr, mux)
if err != nil {
klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err)
klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, standardflags.Configuration.MetricsPath, err)
}
}()
}
Expand Down Expand Up @@ -722,39 +707,13 @@ func main() {
}
}

if !*enableLeaderElection {
run(ctx)
} else {
// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/controller
// to preserve backwards compatibility
lockName := strings.Replace(provisionerName, "/", "-", -1)

// create a new clientset for leader election
leClientset, err := kubernetes.NewForConfig(coreConfig)
if err != nil {
klog.Fatalf("Failed to create leaderelection client: %v", err)
}

le := leaderelection.NewLeaderElection(leClientset, lockName, run)
if *httpEndpoint != "" {
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
}

if *leaderElectionNamespace != "" {
le.WithNamespace(*leaderElectionNamespace)
}

le.WithLeaseDuration(*leaderElectionLeaseDuration)
le.WithRenewDeadline(*leaderElectionRenewDeadline)
le.WithRetryPeriod(*leaderElectionRetryPeriod)
le.WithIdentity(identity)
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
le.WithReleaseOnCancel(true)
le.WithContext(ctx)
}

if err := le.Run(); err != nil {
klog.Fatalf("failed to initialize leader election: %v", err)
}
}
leaderelection.RunWithLeaderElection(
ctx,
config,
standardflags.Configuration,
run,
strings.Replace(provisionerName, "/", "-", -1),
mux,
utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit),
)
}