1919package org .apache .flink .runtime .executiongraph ;
2020
2121import org .apache .flink .api .common .JobStatus ;
22- import org .apache .flink .runtime .concurrent .ComponentMainThreadExecutorServiceAdapter ;
2322import org .apache .flink .runtime .concurrent .ManuallyTriggeredScheduledExecutorService ;
2423import org .apache .flink .runtime .execution .ExecutionState ;
2524import org .apache .flink .runtime .executiongraph .failover .FixedDelayRestartBackoffTimeStrategy ;
3534import org .apache .flink .runtime .scheduler .TestingPhysicalSlot ;
3635import org .apache .flink .runtime .scheduler .TestingPhysicalSlotProvider ;
3736import org .apache .flink .runtime .taskmanager .TaskManagerLocation ;
38- import org .apache .flink .runtime .testutils .DirectScheduledExecutorService ;
3937import org .apache .flink .testutils .TestingUtils ;
4038import org .apache .flink .testutils .executor .TestExecutorExtension ;
4139import org .apache .flink .util .FlinkException ;
@@ -58,6 +56,13 @@ class ExecutionGraphCoLocationRestartTest {
5856 static final TestExecutorExtension <ScheduledExecutorService > EXECUTOR_RESOURCE =
5957 TestingUtils .defaultExecutorExtension ();
6058
59+ @ RegisterExtension
60+ static final TestingComponentMainThreadExecutor .Extension MAIN_EXECUTOR_RESOURCE =
61+ new TestingComponentMainThreadExecutor .Extension ();
62+
63+ private final TestingComponentMainThreadExecutor mainThreadExecutor =
64+ MAIN_EXECUTOR_RESOURCE .getComponentMainThreadTestExecutor ();
65+
6166 private static final int NUM_TASKS = 31 ;
6267
6368 @ Test
@@ -83,11 +88,10 @@ void testConstraintsAfterRestart() throws Exception {
8388
8489 final ManuallyTriggeredScheduledExecutorService delayExecutor =
8590 new ManuallyTriggeredScheduledExecutorService ();
86- final DirectScheduledExecutorService futureExecutor = new DirectScheduledExecutorService ();
8791 final SchedulerBase scheduler =
8892 new DefaultSchedulerBuilder (
8993 jobGraph ,
90- ComponentMainThreadExecutorServiceAdapter . forMainThread (),
94+ mainThreadExecutor . getMainThreadExecutor (),
9195 EXECUTOR_RESOURCE .getExecutor ())
9296 .setExecutionSlotAllocatorFactory (
9397 SchedulerTestingUtils .newSlotSharingExecutionSlotAllocatorFactory (
@@ -97,7 +101,6 @@ void testConstraintsAfterRestart() throws Exception {
97101 TestingPhysicalSlot .builder ()
98102 .build ()))))
99103 .setDelayExecutor (delayExecutor )
100- .setFutureExecutor (futureExecutor )
101104 .setRestartBackoffTimeStrategy (
102105 new FixedDelayRestartBackoffTimeStrategy
103106 .FixedDelayRestartBackoffTimeStrategyFactory (1 , 0 )
@@ -109,7 +112,7 @@ void testConstraintsAfterRestart() throws Exception {
109112 // enable the queued scheduling for the slot pool
110113 assertThat (eg .getState ()).isEqualTo (JobStatus .CREATED );
111114
112- scheduler . startScheduling ( );
115+ mainThreadExecutor . execute ( scheduler :: startScheduling );
113116
114117 Predicate <AccessExecution > isDeploying =
115118 ExecutionGraphTestUtils .isInExecutionState (ExecutionState .DEPLOYING );
@@ -120,19 +123,28 @@ void testConstraintsAfterRestart() throws Exception {
120123 // sanity checks
121124 validateConstraints (eg );
122125
123- eg .getAllExecutionVertices ().iterator ().next ().fail (new FlinkException ("Test exception" ));
126+ mainThreadExecutor .execute (
127+ () -> {
128+ eg .getAllExecutionVertices ()
129+ .iterator ()
130+ .next ()
131+ .fail (new FlinkException ("Test exception" ));
132+ });
124133
125134 assertThat (eg .getState ()).isEqualTo (JobStatus .RESTARTING );
126135
127136 // trigger registration of restartTasks(...) callback to cancelFuture before completing the
128137 // cancellation. This ensures the restarting actions to be performed in main thread.
129138 delayExecutor .triggerNonPeriodicScheduledTask ();
130139
131- for (ExecutionVertex vertex : eg .getAllExecutionVertices ()) {
132- if (vertex .getExecutionState () == ExecutionState .CANCELING ) {
133- vertex .getCurrentExecutionAttempt ().completeCancelling ();
134- }
135- }
140+ mainThreadExecutor .execute (
141+ () -> {
142+ for (ExecutionVertex vertex : eg .getAllExecutionVertices ()) {
143+ if (vertex .getExecutionState () == ExecutionState .CANCELING ) {
144+ vertex .getCurrentExecutionAttempt ().completeCancelling ();
145+ }
146+ }
147+ });
136148
137149 // wait until we have restarted
138150 ExecutionGraphTestUtils .waitUntilJobStatus (eg , JobStatus .RUNNING , timeout );
@@ -142,7 +154,10 @@ void testConstraintsAfterRestart() throws Exception {
142154 // checking execution vertex properties
143155 validateConstraints (eg );
144156
145- ExecutionGraphTestUtils .finishAllVertices (eg );
157+ mainThreadExecutor .execute (
158+ () -> {
159+ ExecutionGraphTestUtils .finishAllVertices (eg );
160+ });
146161
147162 assertThat (eg .getState ()).isEqualTo (FINISHED );
148163 }
0 commit comments