Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| controller.workqueueRateLimiter.bucketSize | int | `500` | Specifies the maximum number of items that can be in the workqueue at any given time. |
| controller.workqueueRateLimiter.maxDelay.enable | bool | `true` | Specifies whether to enable max delay for the workqueue rate limiter. This is useful to avoid losing events when the workqueue is full. |
| controller.workqueueRateLimiter.maxDelay.duration | string | `"6h"` | Specifies the maximum delay duration for the workqueue rate limiter. |
| controller.submitter.type | string | `"default"` | Type of submitter to use: 'default' or 'grpc'. |
| controller.submitter.grpcServerAddress | string | `"localhost:50051"` | gRPC server address for alternate Spark submit (used when submitter.type is 'grpc'). |
| controller.submitter.grpcSubmitTimeout | string | `"10s"` | Timeout for gRPC Spark submit (used when submitter.type is 'grpc'). |
| webhook.enable | bool | `true` | Specifies whether to enable webhook. |
| webhook.replicas | int | `1` | Number of replicas of webhook server. |
| webhook.leaderElection.enable | bool | `true` | Specifies whether to enable leader election for webhook. |
Expand Down
38 changes: 38 additions & 0 deletions charts/spark-operator-chart/templates/controller/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ spec:
{{- if .Values.controller.maxTrackedExecutorPerApp }}
- --max-tracked-executor-per-app={{ .Values.controller.maxTrackedExecutorPerApp }}
{{- end }}
{{- if .Values.controller.submitter.type }}
- --submitter-type={{ .Values.controller.submitter.type }}
{{- end }}
{{- if .Values.controller.submitter.grpcServerAddress }}
- --grpc-server-address={{ .Values.controller.submitter.grpcServerAddress }}
{{- end }}
{{- if .Values.controller.submitter.grpcSubmitTimeout }}
- --grpc-submit-timeout={{ .Values.controller.submitter.grpcSubmitTimeout }}
{{- end }}
{{- if or .Values.prometheus.metrics.enable .Values.controller.pprof.enable }}
ports:
{{- if .Values.controller.pprof.enable }}
Expand Down Expand Up @@ -161,6 +170,35 @@ spec:
securityContext:
{{- toYaml . | nindent 10 }}
{{- end }}
{{- if .Values.controller.submitter.nativeSubmitPlugin.enabled }}
- name: native-submit
image: {{ .Values.controller.submitter.nativeSubmitPlugin.image }}
{{- with .Values.controller.submitter.nativeSubmitPlugin.imagePullPolicy }}
imagePullPolicy: {{ . }}
{{- end }}
{{- with .Values.controller.submitter.nativeSubmitPlugin.ports }}
ports:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.controller.submitter.nativeSubmitPlugin.livenessProbe }}
livenessProbe:
{{- toYaml . | nindent 10 }}
{{- end }}
{{- with .Values.controller.submitter.nativeSubmitPlugin.readinessProbe }}
readinessProbe:
{{- toYaml . | nindent 10 }}
{{- end }}
{{- with .Values.controller.submitter.nativeSubmitPlugin.resources }}
resources:
{{- toYaml . | nindent 10 }}
{{- end }}
{{- with .Values.controller.submitter.nativeSubmitPlugin.terminationMessagePath }}
terminationMessagePath: {{ . }}
{{- end }}
{{- with .Values.controller.submitter.nativeSubmitPlugin.terminationMessagePolicy }}
terminationMessagePolicy: {{ . }}
{{- end }}
{{- end }}
{{- with .Values.controller.sidecars }}
{{- toYaml . | nindent 6 }}
{{- end }}
Expand Down
68 changes: 68 additions & 0 deletions charts/spark-operator-chart/tests/controller/deployment_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,71 @@ tests:
- contains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --max-tracked-executor-per-app=123

- it: Should add native-submit sidecar container if `controller.submitter.nativeSubmitPlugin.enabled` is set to `true`
set:
controller:
submitter:
nativeSubmitPlugin:
enabled: true
image: "docker.io/library/native-submit-plugin:08142025"
imagePullPolicy: "IfNotPresent"
ports:
- containerPort: 50051
name: metrics
protocol: TCP
livenessProbe:
failureThreshold: 3
httpGet:
path: /healthz
port: 9090
scheme: HTTP
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
readinessProbe:
failureThreshold: 3
httpGet:
path: /readyz
port: 9090
scheme: HTTP
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 50m
memory: 64Mi
asserts:
- equal:
path: spec.template.spec.containers[?(@.name=="native-submit")].name
value: native-submit
- equal:
path: spec.template.spec.containers[?(@.name=="native-submit")].image
value: docker.io/library/native-submit-plugin:08142025
- equal:
path: spec.template.spec.containers[?(@.name=="native-submit")].imagePullPolicy
value: IfNotPresent
- contains:
path: spec.template.spec.containers[?(@.name=="native-submit")].ports
content:
name: metrics
containerPort: 50051
protocol: TCP
- equal:
path: spec.template.spec.containers[?(@.name=="native-submit")].livenessProbe.httpGet.port
value: 9090
- equal:
path: spec.template.spec.containers[?(@.name=="native-submit")].livenessProbe.httpGet.path
value: /healthz
- equal:
path: spec.template.spec.containers[?(@.name=="native-submit")].readinessProbe.httpGet.port
value: 9090
- equal:
path: spec.template.spec.containers[?(@.name=="native-submit")].readinessProbe.httpGet.path
value: /readyz
55 changes: 55 additions & 0 deletions charts/spark-operator-chart/values-grpc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Example values file for gRPC submitter with native submit plugin sidecar
image:
repository: docker.io/library/spark-operator
tag: dev
pullPolicy: IfNotPresent
controller:
submitter:
# Enable gRPC submitter
type: "grpc"
# gRPC server address (points to the sidecar container)
grpcServerAddress: "localhost:50051"
# Timeout for gRPC Spark submit
grpcSubmitTimeout: "10s"
# Enable native submit plugin sidecar
nativeSubmitPlugin:
enabled: true
image: "docker.io/library/native-submit-plugin:08142025"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find this image. Could yo provide more details about the implementation?

imagePullPolicy: "IfNotPresent"
port: 50051
portName: "grpc"
# Container ports configuration
ports:
- containerPort: 50051
name: metrics
protocol: TCP
# Liveness probe configuration
livenessProbe:
failureThreshold: 3
httpGet:
path: /healthz
port: 9090
scheme: HTTP
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
# Readiness probe configuration
readinessProbe:
failureThreshold: 3
httpGet:
path: /readyz
port: 9090
scheme: HTTP
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
# Container termination configuration
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 50m
memory: 64Mi
58 changes: 58 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,64 @@

# -- Sidecar containers for controller pods.
sidecars: []

# -- Spark submitter configuration
submitter:
# -- Type of submitter to use: 'default' or 'grpc'
type: "default"
# -- gRPC server address for alternate Spark submit (used when submitter.type is 'grpc')
# When using native submit plugin sidecar, this should be 'localhost:50051'
grpcServerAddress: "localhost:50051"
# -- Timeout for gRPC Spark submit (used when submitter.type is 'grpc')
grpcSubmitTimeout: "10s"
# -- Native submit plugin sidecar configuration
nativeSubmitPlugin:
# -- Enable native submit plugin sidecar
enabled: false
# -- Native submit plugin image
image: "docker.io/library/native-submit-plugin:08142025"

Check failure on line 216 in charts/spark-operator-chart/values.yaml

View workflow job for this annotation

GitHub Actions / build-helm-chart

216:1 [trailing-spaces] trailing spaces
# -- Native submit plugin image pull policy
imagePullPolicy: "IfNotPresent"
# -- Native submit plugin container port
port: 50051
# -- Native submit plugin container port name
portName: "grpc"
# -- Container ports configuration
ports:
- containerPort: 50051
name: metrics
protocol: TCP
# -- Liveness probe configuration
livenessProbe:
failureThreshold: 3
httpGet:
path: /healthz
port: 9090
scheme: HTTP
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
# -- Readiness probe configuration
readinessProbe:
failureThreshold: 3
httpGet:
path: /readyz
port: 9090
scheme: HTTP
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
# -- Container termination configuration
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
# -- Resource requests and limits for native submit plugin
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 50m
# memory: 64Mi

# Pod disruption budget for controller to avoid service degradation.
podDisruptionBudget:
Expand Down
23 changes: 22 additions & 1 deletion cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ var (
zapOptions = logzap.Options{}
)

var (
submitterType string
grpcServerAddress string
grpcSubmitTimeout time.Duration
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(schedulingv1alpha1.AddToScheme(scheme))
Expand Down Expand Up @@ -204,6 +210,10 @@ func NewStartCommand() *cobra.Command {
command.Flags().StringVar(&pprofBindAddress, "pprof-bind-address", "0", "The address the pprof endpoint binds to. "+
"If not set, it will be 0 in order to disable the pprof server")

command.Flags().StringVar(&submitterType, "submitter-type", "default", "SparkApplication submitter type: 'default' or 'grpc'.")
command.Flags().StringVar(&grpcServerAddress, "grpc-server-address", "localhost:50051", "gRPC server address for alternate Spark submit.")
command.Flags().DurationVar(&grpcSubmitTimeout, "grpc-submit-timeout", 10*time.Second, "Timeout for gRPC Spark submit.")

flagSet := flag.NewFlagSet("controller", flag.ExitOnError)
ctrl.RegisterFlags(flagSet)
zapOptions.BindFlags(flagSet)
Expand Down Expand Up @@ -286,7 +296,15 @@ func start() {
}
}

sparkSubmitter := &sparkapplication.SparkSubmitter{}
var sparkSubmitter sparkapplication.SparkApplicationSubmitter
switch submitterType {
case "grpc":
sparkSubmitter = sparkapplication.NewGRPCSparkSubmitter(grpcServerAddress, grpcSubmitTimeout)
logger.Info("Using gRPC SparkApplication submitter", "address", grpcServerAddress, "timeout", grpcSubmitTimeout)
default:
sparkSubmitter = &sparkapplication.SparkSubmitter{}
logger.Info("Using default SparkApplication submitter")
}

// Setup controller for SparkApplication.
if err = sparkapplication.NewReconciler(
Expand Down Expand Up @@ -441,6 +459,9 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options {
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
MaxTrackedExecutorPerApp: maxTrackedExecutorPerApp,
SubmitterType: submitterType,
GRPCServerAddress: grpcServerAddress,
GRPCSubmitTimeout: grpcSubmitTimeout,
}
if enableBatchScheduler {
options.KubeSchedulerNames = kubeSchedulerNames
Expand Down
1 change: 1 addition & 0 deletions config/crd/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
resources:
- bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml
- bases/sparkoperator.k8s.io_sparkapplications.yaml
- bases/sparkoperator.k8s.io_sparkconnects.yaml
# +kubebuilder:scaffold:crdkustomizeresource

patches:
Expand Down
5 changes: 5 additions & 0 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type Options struct {
SparkExecutorMetrics *metrics.SparkExecutorMetrics

MaxTrackedExecutorPerApp int

// Submitter configuration
SubmitterType string // "default" or "grpc"
GRPCServerAddress string // gRPC server address, e.g., "localhost:50051"
GRPCSubmitTimeout time.Duration // gRPC submit timeout, e.g., 10 * time.Second
}

// Reconciler reconciles a SparkApplication object.
Expand Down
Loading
Loading