Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.dinky.function.pool.UdfCodePool;
import org.dinky.job.ClearJobHistoryTask;
import org.dinky.job.FlinkJobTask;
import org.dinky.job.RecheckJobTask;
import org.dinky.resource.BaseResourceManager;
import org.dinky.scheduler.client.ProjectClient;
import org.dinky.scheduler.exception.SchedulerException;
Expand Down Expand Up @@ -160,6 +161,9 @@ private void initDaemon() {
DaemonTask clearJobHistoryTask = DaemonTask.build(new DaemonTaskConfig(ClearJobHistoryTask.TYPE));
schedule.addSchedule(clearJobHistoryTask, new PeriodicTrigger(1, TimeUnit.HOURS));

DaemonTask recheckJobTask = DaemonTask.build(new DaemonTaskConfig(RecheckJobTask.TYPE));
schedule.addSchedule(recheckJobTask, new PeriodicTrigger(5, TimeUnit.MINUTES));

// Add flink running job task to flink job thread pool
List<JobInstance> jobInstances = jobInstanceService.listJobInstanceActive();
FlinkJobThreadPool flinkJobThreadPool = FlinkJobThreadPool.getInstance();
Expand Down
126 changes: 126 additions & 0 deletions dinky-admin/src/main/java/org/dinky/job/RecheckJobTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.job;

import org.dinky.api.FlinkAPI;
import org.dinky.context.SpringContextUtils;
import org.dinky.daemon.pool.FlinkJobThreadPool;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.model.ext.JobInfoDetail;
import org.dinky.data.model.job.JobInstance;
import org.dinky.service.JobInstanceService;

import java.util.List;
import java.util.Optional;

import org.springframework.context.annotation.DependsOn;

import com.fasterxml.jackson.databind.JsonNode;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@DependsOn("springContextUtils")
@Slf4j
@Data
public class RecheckJobTask implements DaemonTask {

public static final String TYPE = RecheckJobTask.class.toString();

private DaemonTaskConfig config;

private static JobInstanceService jobInstanceService;

static {
jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class);
}

@Override
public DaemonTask setConfig(DaemonTaskConfig config) {
this.config = config;
return this;
}

@Override
public DaemonTaskConfig getConfig() {
return config;
}

@Override
public String getType() {
return TYPE;
}

@Override
public boolean dealTask() {
// Since flink-operator supports task redeployment and automatic recovery of failed jobs,
// we need to recheck the abnormal jobs here. If the job status has recovered to normal,
// put it back into the task monitoring queue.
log.info("Starting recheck of job instances...");

List<JobInstance> jobInstances = jobInstanceService.listJobInstancesToRecheck();
log.info("Found {} job instances to recheck", jobInstances.size());

FlinkJobThreadPool flinkJobThreadPool = FlinkJobThreadPool.getInstance();
for (JobInstance jobInstance : jobInstances) {
log.info(
"Rechecking job instance: id={}, name={}, taskId={}",
jobInstance.getId(),
jobInstance.getName(),
jobInstance.getTaskId());
JobInfoDetail jobInfoDetail = jobInstanceService.getJobInfoDetail(jobInstance.getId());
Optional<JobStatus> newStatus = this.recheckJobInstanceStatus(jobInfoDetail);
log.info("Job '{}' status after recheck: {}", jobInstance.getName(), newStatus.orElse(null));
if (newStatus.isPresent() && newStatus.get() == JobStatus.RUNNING) {
log.info("Job '{}' is RUNNING again, re-adding to monitoring queue", jobInstance.getName());
DaemonTaskConfig config =
DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId(), jobInstance.getTaskId());
DaemonTask daemonTask = DaemonTask.build(config);
flinkJobThreadPool.execute(daemonTask);
}
}
log.info("Job recheck completed.");
return true;
}

private Optional<JobStatus> recheckJobInstanceStatus(JobInfoDetail jobInfoDetail) {
try {
String jmHost = jobInfoDetail.getClusterInstance().getJobManagerHost();
log.info("Querying job status from JobManager host: {}", jmHost);
List<JsonNode> jobs = FlinkAPI.build(jmHost).listJobs();
if (jobs == null || jobs.isEmpty()) {
log.warn("No jobs found on JobManager host: {}", jmHost);
return Optional.empty();
}
JsonNode firstJob = jobs.get(0);
log.debug("Job response: {}", firstJob.toString());
String newStatus = firstJob.get("state").asText();
log.info("Fetched job state: {}", newStatus);
return Optional.of(JobStatus.valueOf(newStatus));
} catch (Exception e) {
log.warn(
"Failed to fetch job status, task: {}",
jobInfoDetail.getInstance().getName());
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave)

checkAndRefreshCluster(jobInfoDetail);

checkAndRefreshJobId(jobInfoDetail);

// Update the value of JobData from the flink api while ignoring the null value to prevent
// some other configuration from being overwritten
BeanUtil.copyProperties(
Expand Down Expand Up @@ -225,6 +227,47 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave)
return isDone;
}

/**
* In Flink operator mode, resource scaling triggers a job redeployment, which results in a new job ID.
* The system will update to the latest job ID accordingly.
*
*
* @param jobInfoDetail The job info detail.
*/
public static void checkAndRefreshJobId(JobInfoDetail jobInfoDetail) {
if (!GatewayType.get(jobInfoDetail.getClusterInstance().getType()).isKubernetesApplicationMode()) {
return;
}

List<JsonNode> jobs = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost())
.listJobs();
if (jobs == null || jobs.isEmpty()) {
log.info(
"No running jobs found on task: {}",
jobInfoDetail.getClusterInstance().getJobManagerHost());
return;
}

JsonNode firstJob = jobs.stream().findFirst().orElse(jobs.get(0));
String latestJobId = firstJob.get("jid").asText();
String currentJobId = jobInfoDetail.getInstance().getJid();
if (!latestJobId.equals(currentJobId)) {
JobInstance jobInstance = jobInfoDetail.getInstance();
jobInstance.setJid(latestJobId);
jobInstanceService.updateById(jobInstance);
log.info(
"JobId for [{}] has been refreshed: {} -> {}",
jobInfoDetail.getInstance().getName(),
currentJobId,
latestJobId);
} else {
log.debug(
"JobId for [{}] is up to date: {}",
jobInfoDetail.getInstance().getName(),
currentJobId);
}
}

/**
* Retrieves job history.
* getJobStatusInformationFromFlinkRestAPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public interface JobInstanceMapper extends SuperMapper<JobInstance> {
@InterceptorIgnore(tenantLine = "true")
List<JobInstance> listJobInstanceActive();

@InterceptorIgnore(tenantLine = "true")
List<JobInstance> listJobInstancesToRecheck();

JobInstance getJobInstanceByTaskId(Integer id);

@InterceptorIgnore(tenantLine = "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
*/
List<JobInstance> listJobInstanceActive();

List<JobInstance> listJobInstancesToRecheck();

/**
* Get the job information detail for the given ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public List<JobInstance> listJobInstanceActive() {
return baseMapper.listJobInstanceActive();
}

@Override
public List<JobInstance> listJobInstancesToRecheck() {
return baseMapper.listJobInstancesToRecheck();
}

@Override
public JobInfoDetail getJobInfoDetail(Integer id) {
if (Asserts.isNull(TenantContextHolder.get())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
org.dinky.job.FlinkJobTask
org.dinky.job.SystemMetricsTask
org.dinky.job.ClearJobHistoryTask
org.dinky.job.ClearJobHistoryTask
org.dinky.job.RecheckJobTask
13 changes: 13 additions & 0 deletions dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@
order by id desc
</select>

<select id="listJobInstancesToRecheck" resultType="org.dinky.data.model.job.JobInstance">
select dji.*
from dinky_job_instance dji
join (
select task_id, max(create_time) as max_ct
from dinky_job_instance
group by task_id
) latest
on dji.task_id = latest.task_id and dji.create_time = latest.max_ct
WHERE dji.status IN ('FAILED', 'CANCELED', 'FINISHED', 'UNKNOWN')
order by dji.id desc;
</select>

<select id="getJobInstanceByTaskId" resultType="org.dinky.data.model.job.JobInstance">
select *
from dinky_job_instance
Expand Down
Loading