Skip to content

Commit

Permalink
Improving logic to cancel a running job (#213)
Browse files Browse the repository at this point in the history
* Improving logic to cancel a job
  • Loading branch information
alfespa17 authored Jun 11, 2022
1 parent 3e68351 commit b1c6c21
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
}
break;
case cancelled:
case failed:
case rejected:
try {
log.info("Deleting Cancelled/Rejected Job Context {}", PREFIX_JOB_CONTEXT + job.getId());
rejectJob(job.getId());
log.info("Deleting Failed/Cancelled/Rejected Job Context {} from Quartz", PREFIX_JOB_CONTEXT + job.getId());
cancelJobSteps(job.getId());
jobExecutionContext.getScheduler().deleteJob(new JobKey(PREFIX_JOB_CONTEXT + job.getId()));
} catch (SchedulerException e) {
log.error(e.getMessage());
Expand Down Expand Up @@ -97,6 +98,11 @@ private void executePendingJob(Job job) {
case customScripts:
if (executorService.execute(job, stepId, flow.get()))
log.info("Executing Job {} Step Id {}", job.getId(), stepId);
else {
log.error("Error when sending context to executor marking job {} as failed", job.getId());
job.setStatus(JobStatus.failed);
jobRepository.save(job);
}
break;
case approval:
job.setStatus(JobStatus.waitingApproval);
Expand Down Expand Up @@ -128,9 +134,9 @@ private void executeApprovedJobs(Job job) {
}
}

private void rejectJob(int jobId){
private void cancelJobSteps(int jobId){
for(Step step: stepRepository.findByJobId(jobId)){
if(step.getStatus().equals(JobStatus.pending)){
if(step.getStatus().equals(JobStatus.pending) || step.getStatus().equals(JobStatus.running)){
step.setStatus(JobStatus.cancelled);
stepRepository.save(step);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.azbuilder.api.repository.StepRepository;
import org.azbuilder.api.rs.job.Job;
import org.azbuilder.api.rs.job.JobStatus;
import org.azbuilder.api.rs.job.step.Step;
import org.quartz.*;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.text.ParseException;

Expand All @@ -19,6 +23,8 @@ public class ScheduleJobService {

Scheduler scheduler;

StepRepository stepRepository;

public void createJobTrigger(String cronExpression, String triggerId) throws ParseException, SchedulerException {

JobDataMap jobDataMap = new JobDataMap();
Expand Down Expand Up @@ -72,4 +78,16 @@ public void deleteJobTrigger(String triggerId) throws ParseException, SchedulerE
scheduler.deleteJob(new JobKey(PREFIX_JOB + triggerId));
}

@Transactional
public void deleteJobContext(int jobId) throws ParseException, SchedulerException {
log.info("Delete Job Context {}", jobId);
scheduler.deleteJob(new JobKey(PREFIX_JOB_CONTEXT + jobId));
for(Step step: stepRepository.findByJobId(jobId)){
if(step.getStatus().equals(JobStatus.pending) || step.getStatus().equals(JobStatus.running)){
step.setStatus(JobStatus.cancelled);
stepRepository.save(step);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import lombok.extern.slf4j.Slf4j;
import org.azbuilder.api.plugin.scheduler.ScheduleJobService;
import org.azbuilder.api.rs.job.Job;
import org.azbuilder.api.rs.job.JobStatus;
import org.quartz.SchedulerException;

import java.text.ParseException;
Expand All @@ -23,10 +24,17 @@ public class JobManageHook implements LifeCycleHook<Job> {
public void execute(LifeCycleHookBinding.Operation operation, LifeCycleHookBinding.TransactionPhase transactionPhase, Job job, RequestScope requestScope, Optional<ChangeSpec> optional) {
log.info("JobCreateHook {}", job.getId());
try {
if(operation.equals(LifeCycleHookBinding.Operation.CREATE)) {
scheduleJobService.createJobContext(job);
}else {
log.info("Not supported {}", operation);
switch (operation){
case CREATE:
scheduleJobService.createJobContext(job);
break;
case UPDATE:
if(job.getStatus().equals(JobStatus.cancelled))
scheduleJobService.deleteJobContext(job.getId());
break;
default:
log.info("Not supported {}", operation);
break;
}

} catch (ParseException | SchedulerException e) {
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/org/azbuilder/api/rs/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;

@LifeCycleHookBinding(operation = LifeCycleHookBinding.Operation.CREATE, phase = LifeCycleHookBinding.TransactionPhase.POSTCOMMIT, hook = JobManageHook.class)
@LifeCycleHookBinding(operation = LifeCycleHookBinding.Operation.UPDATE, phase = LifeCycleHookBinding.TransactionPhase.POSTCOMMIT, hook = JobManageHook.class)
@Include(rootLevel = false)
@Getter
@Setter
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/org/azbuilder/api/rs/job/JobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ public enum JobStatus {
completed,

rejected,
cancelled
cancelled,
failed
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,26 @@ public void setRunningStatus(TerraformJob terraformJob) {
public void setCompletedStatus(boolean successful, TerraformJob terraformJob, String jobOutput, String jobErrorOutput, String jobPlan) {
if (!executorFlagsProperties.isDisableAcknowledge()) {
updateStepStatus(terraformJob.getOrganizationId(), terraformJob.getJobId(), terraformJob.getStepId(), jobOutput, jobErrorOutput);
updateJobStatus(successful, terraformJob.getOrganizationId(), terraformJob.getJobId(), terraformJob.getStepId(), jobOutput, jobErrorOutput, jobPlan);
if(!isJobCancelled(terraformJob))
updateJobStatus(successful, terraformJob.getOrganizationId(), terraformJob.getJobId(), terraformJob.getStepId(), jobOutput, jobErrorOutput, jobPlan);
}
}

private boolean isJobCancelled(TerraformJob terraformJob){
Job job = terrakubeClient.getJobById(terraformJob.getOrganizationId(), terraformJob.getJobId()).getData();
if(job.getAttributes().getStatus().equals("cancelled")) {
log.warn("Job {} was cancelled when running executor", terraformJob.getJobId());
return true;
}
else {
log.info("Job {} is still active", terraformJob.getJobId());
return false;
}
}

private void updateJobStatus(boolean successful, String organizationId, String jobId, String stepId, String jobOutput, String jobErrorOutput, String jobPlan) {
Job job = terrakubeClient.getJobById(organizationId, jobId).getData();
job.getAttributes().setStatus(successful ? "pending" : "completed");
job.getAttributes().setStatus(successful ? "pending" : "failed");

log.info("output: {}", jobOutput.length());
log.info("outputError: {}", jobErrorOutput.length());
Expand Down

0 comments on commit b1c6c21

Please sign in to comment.