@@ -297,7 +297,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl.
297
297
}
298
298
app := old .DeepCopy ()
299
299
300
- _ = r .submitSparkApplication (ctx , app )
300
+ r .submitSparkApplication (ctx , app )
301
301
if err := r .updateSparkApplicationStatus (ctx , app ); err != nil {
302
302
return err
303
303
}
@@ -414,7 +414,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte
414
414
}
415
415
if timeUntilNextRetryDue <= 0 {
416
416
if r .validateSparkResourceDeletion (ctx , app ) {
417
- _ = r .submitSparkApplication (ctx , app )
417
+ r .submitSparkApplication (ctx , app )
418
418
} else {
419
419
if err := r .deleteSparkResources (ctx , app ); err != nil {
420
420
logger .Error (err , "failed to delete resources associated with SparkApplication" )
@@ -497,7 +497,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context,
497
497
logger .Info ("Successfully deleted resources associated with SparkApplication" , "state" , app .Status .AppState .State )
498
498
r .recordSparkApplicationEvent (app )
499
499
r .resetSparkApplicationStatus (app )
500
- _ = r .submitSparkApplication (ctx , app )
500
+ r .submitSparkApplication (ctx , app )
501
501
}
502
502
if err := r .updateSparkApplicationStatus (ctx , app ); err != nil {
503
503
return err
@@ -823,7 +823,7 @@ func (r *Reconciler) reconcileResumingSparkApplication(ctx context.Context, req
823
823
824
824
r .recordSparkApplicationEvent (app )
825
825
826
- _ = r .submitSparkApplication (ctx , app )
826
+ r .submitSparkApplication (ctx , app )
827
827
if err := r .updateSparkApplicationStatus (ctx , app ); err != nil {
828
828
return err
829
829
}
@@ -874,7 +874,8 @@ func (r *Reconciler) getSparkApplication(ctx context.Context, key types.Namespac
874
874
}
875
875
876
876
// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
877
- func (r * Reconciler ) submitSparkApplication (ctx context.Context , app * v1beta2.SparkApplication ) (submitErr error ) {
877
+ // The submission result are recorded in app.Status.{AppState,ExecutionAttempts}.
878
+ func (r * Reconciler ) submitSparkApplication (ctx context.Context , app * v1beta2.SparkApplication ) {
878
879
logger := log .FromContext (ctx )
879
880
logger .Info ("Submitting SparkApplication" , "state" , app .Status .AppState .State )
880
881
@@ -884,6 +885,7 @@ func (r *Reconciler) submitSparkApplication(ctx context.Context, app *v1beta2.Sp
884
885
app .Status .LastSubmissionAttemptTime = metav1 .Now ()
885
886
app .Status .SubmissionAttempts = app .Status .SubmissionAttempts + 1
886
887
888
+ var submitErr error
887
889
defer func () {
888
890
if submitErr == nil {
889
891
app .Status .AppState = v1beta2.ApplicationState {
@@ -901,21 +903,24 @@ func (r *Reconciler) submitSparkApplication(ctx context.Context, app *v1beta2.Sp
901
903
}()
902
904
903
905
if err := r .configWebUI (ctx , app ); err != nil {
904
- return fmt .Errorf ("failed to configure web UI: %v" , err )
906
+ submitErr = fmt .Errorf ("failed to configure web UI: %v" , err )
907
+ return
905
908
}
906
909
907
910
if util .PrometheusMonitoringEnabled (app ) {
908
911
logger .Info ("Configure Prometheus monitoring for SparkApplication" )
909
912
if err := configPrometheusMonitoring (ctx , app , r .client ); err != nil {
910
- return fmt .Errorf ("failed to configure Prometheus monitoring: %v" , err )
913
+ submitErr = fmt .Errorf ("failed to configure Prometheus monitoring: %v" , err )
914
+ return
911
915
}
912
916
}
913
917
914
918
// Use batch scheduler to perform scheduling task before submitting (before build command arguments).
915
919
if needScheduling , scheduler := r .shouldDoBatchScheduling (ctx , app ); needScheduling {
916
920
logger .Info ("Do batch scheduling for SparkApplication" )
917
921
if err := scheduler .Schedule (app ); err != nil {
918
- return fmt .Errorf ("failed to process batch scheduler: %v" , err )
922
+ submitErr = fmt .Errorf ("failed to process batch scheduler: %v" , err )
923
+ return
919
924
}
920
925
}
921
926
@@ -927,10 +932,9 @@ func (r *Reconciler) submitSparkApplication(ctx context.Context, app *v1beta2.Sp
927
932
928
933
if err := r .submitter .Submit (ctx , app ); err != nil {
929
934
r .recordSparkApplicationEvent (app )
930
- return fmt .Errorf ("failed to submit spark application: %v" , err )
935
+ submitErr = fmt .Errorf ("failed to submit spark application: %v" , err )
936
+ return
931
937
}
932
-
933
- return nil
934
938
}
935
939
936
940
// updateDriverState finds the driver pod of the application
0 commit comments