|  | 
| 26 | 26 | import org.apache.flink.runtime.clusterframework.types.AllocationID; | 
| 27 | 27 | import org.apache.flink.runtime.clusterframework.types.ResourceID; | 
| 28 | 28 | import org.apache.flink.runtime.clusterframework.types.SlotID; | 
|  | 29 | +import org.apache.flink.runtime.entrypoint.ClusterInformation; | 
| 29 | 30 | import org.apache.flink.runtime.entrypoint.WorkingDirectory; | 
| 30 | 31 | import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; | 
|  | 32 | +import org.apache.flink.runtime.instance.InstanceID; | 
| 31 | 33 | import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; | 
| 32 | 34 | import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; | 
| 33 | 35 | import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; | 
| 34 | 36 | import org.apache.flink.runtime.messages.Acknowledge; | 
|  | 37 | +import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration; | 
| 35 | 38 | import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; | 
| 36 | 39 | import org.apache.flink.runtime.rpc.TestingRpcService; | 
| 37 | 40 | import org.apache.flink.runtime.rpc.TestingRpcServiceExtension; | 
| @@ -119,7 +122,8 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir) | 
| 119 | 122 |         assertThat(slotReport.getNumSlotStatus(), is(2)); | 
| 120 | 123 | 
 | 
| 121 | 124 |         final SlotStatus slotStatus = slotReport.iterator().next(); | 
| 122 |  | -        final SlotID allocatedSlotID = slotStatus.getSlotID(); | 
|  | 125 | +        final SlotID allocatedSlotID = | 
|  | 126 | +                SlotID.getDynamicSlotID(slotStatus.getSlotID().getResourceID()); | 
| 123 | 127 | 
 | 
| 124 | 128 |         final AllocationID allocationId = new AllocationID(); | 
| 125 | 129 |         taskExecutorGateway | 
| @@ -178,6 +182,145 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir) | 
| 178 | 182 |         assertThat(offeredSlot.getAllocationId(), is(allocationId)); | 
| 179 | 183 |     } | 
| 180 | 184 | 
 | 
|  | 185 | +    @Test | 
|  | 186 | +    void testRecoveredTaskExecutorWillRestoreAllocationStateWithDynamicSlotRequest( | 
|  | 187 | +            @TempDir File tempDir) throws Exception { | 
|  | 188 | +        final ResourceID resourceId = ResourceID.generate(); | 
|  | 189 | + | 
|  | 190 | +        final Configuration configuration = new Configuration(); | 
|  | 191 | +        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 2); | 
|  | 192 | +        configuration.set(StateRecoveryOptions.LOCAL_RECOVERY, true); | 
|  | 193 | + | 
|  | 194 | +        final TestingResourceManagerGateway testingResourceManagerGateway = | 
|  | 195 | +                new TestingResourceManagerGateway(); | 
|  | 196 | +        final ArrayBlockingQueue<TaskExecutorSlotReport> queue = new ArrayBlockingQueue<>(2); | 
|  | 197 | +        testingResourceManagerGateway.setSendSlotReportFunction( | 
|  | 198 | +                slotReportInformation -> { | 
|  | 199 | +                    queue.offer( | 
|  | 200 | +                            TaskExecutorSlotReport.create( | 
|  | 201 | +                                    slotReportInformation.f0, slotReportInformation.f2)); | 
|  | 202 | +                    return CompletableFuture.completedFuture(Acknowledge.get()); | 
|  | 203 | +                }); | 
|  | 204 | + | 
|  | 205 | +        final ArrayBlockingQueue<TaskExecutorRegistration> taskExecutorRegistrations = | 
|  | 206 | +                new ArrayBlockingQueue<>(2); | 
|  | 207 | + | 
|  | 208 | +        testingResourceManagerGateway.setRegisterTaskExecutorFunction( | 
|  | 209 | +                taskExecutorRegistration -> { | 
|  | 210 | +                    taskExecutorRegistrations.offer(taskExecutorRegistration); | 
|  | 211 | +                    return CompletableFuture.completedFuture( | 
|  | 212 | +                            new TaskExecutorRegistrationSuccess( | 
|  | 213 | +                                    new InstanceID(), | 
|  | 214 | +                                    taskExecutorRegistration.getResourceId(), | 
|  | 215 | +                                    new ClusterInformation("localhost", 1234), | 
|  | 216 | +                                    null)); | 
|  | 217 | +                }); | 
|  | 218 | + | 
|  | 219 | +        final TestingRpcService rpcService = rpcServiceExtension.getTestingRpcService(); | 
|  | 220 | +        rpcService.registerGateway( | 
|  | 221 | +                testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); | 
|  | 222 | + | 
|  | 223 | +        final JobID jobId = new JobID(); | 
|  | 224 | + | 
|  | 225 | +        final TestingHighAvailabilityServices highAvailabilityServices = | 
|  | 226 | +                new TestingHighAvailabilityServices(); | 
|  | 227 | + | 
|  | 228 | +        highAvailabilityServices.setResourceManagerLeaderRetriever( | 
|  | 229 | +                new SettableLeaderRetrievalService( | 
|  | 230 | +                        testingResourceManagerGateway.getAddress(), | 
|  | 231 | +                        testingResourceManagerGateway.getFencingToken().toUUID())); | 
|  | 232 | +        final SettableLeaderRetrievalService jobMasterLeaderRetriever = | 
|  | 233 | +                new SettableLeaderRetrievalService(); | 
|  | 234 | +        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever); | 
|  | 235 | + | 
|  | 236 | +        final WorkingDirectory workingDirectory = WorkingDirectory.create(tempDir); | 
|  | 237 | +        final TaskExecutor taskExecutor = | 
|  | 238 | +                TaskExecutorBuilder.newBuilder( | 
|  | 239 | +                                rpcService, highAvailabilityServices, workingDirectory) | 
|  | 240 | +                        .setConfiguration(configuration) | 
|  | 241 | +                        .setResourceId(resourceId) | 
|  | 242 | +                        .build(); | 
|  | 243 | + | 
|  | 244 | +        taskExecutor.start(); | 
|  | 245 | + | 
|  | 246 | +        final TaskExecutorGateway taskExecutorGateway = | 
|  | 247 | +                taskExecutor.getSelfGateway(TaskExecutorGateway.class); | 
|  | 248 | + | 
|  | 249 | +        final TaskExecutorSlotReport taskExecutorSlotReport = queue.take(); | 
|  | 250 | + | 
|  | 251 | +        final SlotReport slotReport = taskExecutorSlotReport.getSlotReport(); | 
|  | 252 | + | 
|  | 253 | +        assertThat(slotReport.getNumSlotStatus(), is(2)); | 
|  | 254 | + | 
|  | 255 | +        final TaskExecutorRegistration taskExecutorRegistration = taskExecutorRegistrations.take(); | 
|  | 256 | +        assertThat(taskExecutorRegistration.getNumberSlots(), is(2)); | 
|  | 257 | + | 
|  | 258 | +        final SlotStatus slotStatus = slotReport.iterator().next(); | 
|  | 259 | +        final SlotID allocatedSlotID = | 
|  | 260 | +                SlotID.getDynamicSlotID(slotStatus.getSlotID().getResourceID()); | 
|  | 261 | + | 
|  | 262 | +        final AllocationID allocationId = new AllocationID(); | 
|  | 263 | +        taskExecutorGateway | 
|  | 264 | +                .requestSlot( | 
|  | 265 | +                        allocatedSlotID, | 
|  | 266 | +                        jobId, | 
|  | 267 | +                        allocationId, | 
|  | 268 | +                        slotStatus.getResourceProfile(), | 
|  | 269 | +                        "localhost", | 
|  | 270 | +                        testingResourceManagerGateway.getFencingToken(), | 
|  | 271 | +                        Duration.ofSeconds(10L)) | 
|  | 272 | +                .join(); | 
|  | 273 | + | 
|  | 274 | +        taskExecutor.close(); | 
|  | 275 | + | 
|  | 276 | +        final BlockingQueue<Collection<SlotOffer>> offeredSlots = new ArrayBlockingQueue<>(1); | 
|  | 277 | + | 
|  | 278 | +        final TestingJobMasterGateway jobMasterGateway = | 
|  | 279 | +                new TestingJobMasterGatewayBuilder() | 
|  | 280 | +                        .setOfferSlotsFunction( | 
|  | 281 | +                                (resourceID, slotOffers) -> { | 
|  | 282 | +                                    offeredSlots.offer(new HashSet<>(slotOffers)); | 
|  | 283 | +                                    return CompletableFuture.completedFuture(slotOffers); | 
|  | 284 | +                                }) | 
|  | 285 | +                        .build(); | 
|  | 286 | +        rpcService.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); | 
|  | 287 | +        jobMasterLeaderRetriever.notifyListener( | 
|  | 288 | +                jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID()); | 
|  | 289 | + | 
|  | 290 | +        // recover the TaskExecutor | 
|  | 291 | +        final TaskExecutor recoveredTaskExecutor = | 
|  | 292 | +                TaskExecutorBuilder.newBuilder( | 
|  | 293 | +                                rpcService, highAvailabilityServices, workingDirectory) | 
|  | 294 | +                        .setConfiguration(configuration) | 
|  | 295 | +                        .setResourceId(resourceId) | 
|  | 296 | +                        .build(); | 
|  | 297 | + | 
|  | 298 | +        recoveredTaskExecutor.start(); | 
|  | 299 | + | 
|  | 300 | +        final TaskExecutorSlotReport recoveredSlotReport = queue.take(); | 
|  | 301 | +        assertThat(recoveredSlotReport.getSlotReport().getNumSlotStatus(), is(3)); | 
|  | 302 | +        for (SlotStatus status : recoveredSlotReport.getSlotReport()) { | 
|  | 303 | +            // dynamic slot request | 
|  | 304 | +            if (status.getSlotID().getSlotNumber() == 2) { | 
|  | 305 | +                assertThat(status.getJobID(), is(jobId)); | 
|  | 306 | +                assertThat(status.getAllocationID(), is(allocationId)); | 
|  | 307 | +            } else { | 
|  | 308 | +                assertThat(status.getJobID(), is(nullValue())); | 
|  | 309 | +            } | 
|  | 310 | +        } | 
|  | 311 | + | 
|  | 312 | +        final TaskExecutorRegistration recoveredTaskExecutorRegistration = | 
|  | 313 | +                taskExecutorRegistrations.take(); | 
|  | 314 | +        assertThat(recoveredTaskExecutorRegistration.getNumberSlots(), is(2)); | 
|  | 315 | + | 
|  | 316 | +        final Collection<SlotOffer> take = offeredSlots.take(); | 
|  | 317 | + | 
|  | 318 | +        assertThat(take, hasSize(1)); | 
|  | 319 | +        final SlotOffer offeredSlot = take.iterator().next(); | 
|  | 320 | + | 
|  | 321 | +        assertThat(offeredSlot.getAllocationId(), is(allocationId)); | 
|  | 322 | +    } | 
|  | 323 | + | 
| 181 | 324 |     private static final class TaskExecutorSlotReport { | 
| 182 | 325 |         private final ResourceID taskExecutorResourceId; | 
| 183 | 326 |         private final SlotReport slotReport; | 
|  | 
0 commit comments