From b691ae179be9c0ef15a95ba28a2c54d0d8500d0b Mon Sep 17 00:00:00 2001 From: mateczagany Date: Sun, 28 Sep 2025 18:45:48 +0200 Subject: [PATCH 1/2] [FLINK-38406][runtime] Fix DefaultSchedulerCheckpointCoordinatorTest failures --- .../checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java index 363fe516b8e66..4dbad7884f61f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; @@ -212,6 +213,7 @@ private DefaultScheduler createSchedulerAndEnableCheckpointing( EXECUTOR_EXTENSION.getExecutor()) .setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(store, counter)) .setRpcTimeout(timeout) + .setFutureExecutor(new DirectScheduledExecutorService()) .build(); } } From 5acaa5edf2b7db310cb48430e86f84bf3424f7da Mon Sep 17 00:00:00 2001 From: mateczagany Date: Tue, 30 Sep 2025 07:47:25 +0200 Subject: [PATCH 2/2] [FLINK-38406] Run ExecutionGraph operations from main thread --- ...ultSchedulerCheckpointCoordinatorTest.java | 58 ++++++++++++------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java index 4dbad7884f61f..51562ef19ab34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java @@ -19,11 +19,11 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -34,7 +34,6 @@ import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; @@ -57,6 +56,13 @@ class DefaultSchedulerCheckpointCoordinatorTest { private static final TestExecutorExtension EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension(); + @RegisterExtension + static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Extension(); + + private final TestingComponentMainThreadExecutor mainThreadExecutor = + MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + /** Tests that the checkpoint coordinator is shut down if the execution graph is failed. */ @Test void testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph() @@ -78,9 +84,14 @@ void testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph() assertThat(checkpointCoordinator).isNotNull(); assertThat(checkpointCoordinator.isShutdown()).isFalse(); - graph.failJob(new Exception("Test Exception"), System.currentTimeMillis()); - - scheduler.closeAsync().get(); + mainThreadExecutor + .execute( + () -> { + graph.failJob( + new Exception("Test Exception"), System.currentTimeMillis()); + return scheduler.closeAsync(); + }) + .get(); assertThat(checkpointCoordinator.isShutdown()).isTrue(); assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FAILED); @@ -108,9 +119,13 @@ void testClosingSchedulerShutsDownCheckpointCoordinatorOnSuspendedExecutionGraph assertThat(checkpointCoordinator).isNotNull(); assertThat(checkpointCoordinator.isShutdown()).isFalse(); - graph.suspend(new Exception("Test Exception")); - - scheduler.closeAsync().get(); + mainThreadExecutor + .execute( + () -> { + graph.suspend(new Exception("Test Exception")); + return scheduler.closeAsync(); + }) + .get(); assertThat(checkpointCoordinator.isShutdown()).isTrue(); assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.SUSPENDED); @@ -138,18 +153,22 @@ void testClosingSchedulerShutsDownCheckpointCoordinatorOnFinishedExecutionGraph( assertThat(checkpointCoordinator).isNotNull(); assertThat(checkpointCoordinator.isShutdown()).isFalse(); - scheduler.startScheduling(); - - for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) { - final Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt(); - scheduler.updateTaskExecutionState( - new TaskExecutionState( - currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED)); - } + mainThreadExecutor.execute( + () -> { + scheduler.startScheduling(); + for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) { + final Execution currentExecutionAttempt = + executionVertex.getCurrentExecutionAttempt(); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + currentExecutionAttempt.getAttemptId(), + ExecutionState.FINISHED)); + } + }); assertThat(graph.getTerminationFuture()).isCompletedWithValue(JobStatus.FINISHED); - scheduler.closeAsync().get(); + mainThreadExecutor.execute(scheduler::closeAsync).get(); assertThat(checkpointCoordinator.isShutdown()).isTrue(); assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FINISHED); @@ -177,7 +196,7 @@ void testClosingSchedulerSuspendsExecutionGraphAndShutsDownCheckpointCoordinator assertThat(checkpointCoordinator).isNotNull(); assertThat(checkpointCoordinator.isShutdown()).isFalse(); - scheduler.closeAsync().get(); + mainThreadExecutor.execute(scheduler::closeAsync).get(); assertThat(graph.getState()).isEqualTo(JobStatus.SUSPENDED); assertThat(checkpointCoordinator.isShutdown()).isTrue(); @@ -209,11 +228,10 @@ private DefaultScheduler createSchedulerAndEnableCheckpointing( return new DefaultSchedulerBuilder( jobGraph, - ComponentMainThreadExecutorServiceAdapter.forMainThread(), + mainThreadExecutor.getMainThreadExecutor(), EXECUTOR_EXTENSION.getExecutor()) .setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(store, counter)) .setRpcTimeout(timeout) - .setFutureExecutor(new DirectScheduledExecutorService()) .build(); } }