Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,36 @@ public class LoggerController extends BaseController {
@GetMapping(value = "/detail")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
public Result<ResponseTaskLog> queryLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return loggerService.queryLog(loginUser, taskInstanceId, skipNum, limit);
public Result<ResponseTaskLog> queryTaskLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return loggerService.queryTaskLog(loginUser, taskInstanceId, skipNum, limit);
}

/**
* query task output
*
* @param loginUser login user
* @param taskInstanceId task instance id
* @param skipNum skip number
* @param limit limit
* @return task log content
*/
@Operation(summary = "queryOutput", description = "QUERY_TASK_INSTANCE_OUTPUT_NOTES")
@Parameters({
@Parameter(name = "taskInstanceId", description = "TASK_ID", required = true, schema = @Schema(implementation = int.class, example = "100")),
@Parameter(name = "skipLineNum", description = "SKIP_LINE_NUM", required = true, schema = @Schema(implementation = int.class, example = "100")),
@Parameter(name = "limit", description = "LIMIT", required = true, schema = @Schema(implementation = int.class, example = "100"))
})
@GetMapping(value = "/output_detail")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
public Result<ResponseTaskLog> queryTaskOutput(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return loggerService.queryTaskOutput(loginUser, taskInstanceId, skipNum, limit);
}

/**
Expand All @@ -97,12 +122,36 @@ public Result<ResponseTaskLog> queryLog(@Parameter(hidden = true) @RequestAttrib
@ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR)
public ResponseEntity downloadTaskLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstanceId") int taskInstanceId) {
byte[] logBytes = loggerService.getLogBytes(loginUser, taskInstanceId);
byte[] logBytes = loggerService.getTaskLogBytes(loginUser, taskInstanceId);
return ResponseEntity
.ok()
.header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"" + System.currentTimeMillis() + ".log" + "\"")
.body(logBytes);
}

/**
* download task output file
*
* @param loginUser login user
* @param taskInstanceId task instance id
* @return task output file content
*/
@Operation(summary = "downloadTaskOutput", description = "DOWNLOAD_TASK_INSTANCE_OUTPUT_NOTES")
@Parameters({
@Parameter(name = "taskInstanceId", description = "TASK_ID", required = true, schema = @Schema(implementation = int.class, example = "100"))
})
@GetMapping(value = "/download-output")
@ResponseBody
@ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR)
public ResponseEntity downloadTaskOutput(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstanceId") int taskInstanceId) {
byte[] outputBytes = loggerService.getTaskOutputBytes(loginUser, taskInstanceId);
return ResponseEntity
.ok()
.header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"" + System.currentTimeMillis() + ".output.log" + "\"")
.body(outputBytes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ public class LocalLogClient {
* @param taskInstance The task instance object, containing information needed to retrieve the log.
* @return The complete log file download response of the task instance, including log content and metadata.
*/
public TaskInstanceLogFileDownloadResponse getWholeLog(TaskInstance taskInstance) {
return getLocalWholeLog(taskInstance);
public TaskInstanceLogFileDownloadResponse getTaskLog(TaskInstance taskInstance) {
return getLocalWholeLog(taskInstance, TaskLogType.LOG);
}

public TaskInstanceLogFileDownloadResponse getTaskOutput(TaskInstance taskInstance) {
return getLocalWholeLog(taskInstance, TaskLogType.OUTPUT);
}

/**
Expand All @@ -55,23 +59,28 @@ public TaskInstanceLogFileDownloadResponse getWholeLog(TaskInstance taskInstance
* @param limit The maximum number of lines to read, indicating the maximum number of lines to retrieve in this query.
* @return The partial log query response, including log content within the specified range and metadata.
*/
public TaskInstanceLogPageQueryResponse getPartLog(TaskInstance taskInstance, int skipLineNum, int limit) {
return getLocalPartLog(taskInstance, skipLineNum, limit);
public TaskInstanceLogPageQueryResponse getTaskLog(TaskInstance taskInstance, int skipLineNum, int limit) {
return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.LOG);
}

public TaskInstanceLogPageQueryResponse getTaskOutput(TaskInstance taskInstance, int skipLineNum, int limit) {
return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT);
}

private TaskInstanceLogFileDownloadResponse getLocalWholeLog(TaskInstance taskInstance) {
private TaskInstanceLogFileDownloadResponse getLocalWholeLog(TaskInstance taskInstance, TaskLogType taskLogType) {
TaskInstanceLogFileDownloadRequest request = new TaskInstanceLogFileDownloadRequest(
taskInstance.getId(),
taskInstance.getLogPath());
taskLogType.getLogPath(taskInstance));
return getProxyLogService(taskInstance).getTaskInstanceWholeLogFileBytes(request);
}

private TaskInstanceLogPageQueryResponse getLocalPartLog(TaskInstance taskInstance, int skipLineNum,
int limit) {
int limit, TaskLogType taskLogType) {
String logFilePath = taskLogType.getLogPath(taskInstance);
TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest
.builder()
.taskInstanceId(taskInstance.getId())
.taskInstanceLogAbsolutePath(taskInstance.getLogPath())
.taskInstanceLogAbsolutePath(logFilePath)
.skipLineNum(skipLineNum)
.limit(limit)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,35 @@ public class LogClientDelegate {
* @param limit The maximum number of log lines to retrieve.
* @return A string containing the specified portion of the log.
*/
public String getPartLogString(TaskInstance taskInstance, int skipLineNum, int limit) {

public String getTaskLogString(TaskInstance taskInstance, int skipLineNum, int limit) {
return getPartLogString(taskInstance, skipLineNum, limit, TaskLogType.LOG);
}

public String getTaskOutputString(TaskInstance taskInstance, int skipLineNum, int limit) {
return getPartLogString(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT);
}

private String getPartLogString(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) {
checkArgs(taskInstance);
if (checkNodeExists(taskInstance)) {
TaskInstanceLogPageQueryResponse response = localLogClient.getPartLog(taskInstance, skipLineNum, limit);
TaskInstanceLogPageQueryResponse response =
taskLogType == TaskLogType.LOG
? localLogClient.getTaskLog(taskInstance, skipLineNum, limit)
: localLogClient.getTaskOutput(taskInstance, skipLineNum, limit);
if (response.getCode() == LogResponseStatus.SUCCESS) {
return response.getLogContent();
} else {
log.warn("get part log string is not success for task instance {}; reason :{}",
taskInstance.getId(), response.getMessage());
return remoteLogClient.getPartLog(taskInstance, skipLineNum, limit);
return taskLogType == TaskLogType.LOG
? remoteLogClient.getTaskLogString(taskInstance, skipLineNum, limit)
: remoteLogClient.getTaskOutputString(taskInstance, skipLineNum, limit);
}
} else {
return remoteLogClient.getPartLog(taskInstance, skipLineNum, limit);
return taskLogType == TaskLogType.LOG
? remoteLogClient.getTaskLogString(taskInstance, skipLineNum, limit)
: remoteLogClient.getTaskOutputString(taskInstance, skipLineNum, limit);
}
}

Expand All @@ -73,19 +89,34 @@ public String getPartLogString(TaskInstance taskInstance, int skipLineNum, int l
* @param taskInstance The task instance object, containing information needed for log retrieval.
* @return A byte array containing the complete log content.
*/
public byte[] getWholeLogBytes(TaskInstance taskInstance) {
public byte[] getTaskLogBytes(TaskInstance taskInstance) {
return getWholeLogBytes(taskInstance, TaskLogType.LOG);
}

public byte[] getTaskOutputBytes(TaskInstance taskInstance) {
return getWholeLogBytes(taskInstance, TaskLogType.OUTPUT);
}

private byte[] getWholeLogBytes(TaskInstance taskInstance, TaskLogType taskLogType) {
checkArgs(taskInstance);
if (checkNodeExists(taskInstance)) {
TaskInstanceLogFileDownloadResponse response = localLogClient.getWholeLog(taskInstance);
TaskInstanceLogFileDownloadResponse response =
taskLogType == TaskLogType.LOG
? localLogClient.getTaskLog(taskInstance)
: localLogClient.getTaskOutput(taskInstance);
if (response.getCode() == LogResponseStatus.SUCCESS) {
return response.getLogBytes();
} else {
log.warn("get whole log bytes is not success for task instance {}; reason :{}", taskInstance.getId(),
response.getMessage());
return remoteLogClient.getWholeLog(taskInstance);
return taskLogType == TaskLogType.LOG
? remoteLogClient.getTaskLogBytes(taskInstance)
: remoteLogClient.getTaskOutputBytes(taskInstance);
}
} else {
return remoteLogClient.getWholeLog(taskInstance);
return taskLogType == TaskLogType.LOG
? remoteLogClient.getTaskLogBytes(taskInstance)
: remoteLogClient.getTaskOutputBytes(taskInstance);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ public class RemoteLogClient {
* @param taskInstance The task instance object, containing information such as the task ID and log path.
* @return Returns the log content in byte array format.
*/
public byte[] getWholeLog(TaskInstance taskInstance) {
return LogUtils.getFileContentBytesFromRemote(taskInstance.getLogPath());
public byte[] getTaskLogBytes(TaskInstance taskInstance) {
return getWholeLog(taskInstance, TaskLogType.LOG);
}

public byte[] getTaskOutputBytes(TaskInstance taskInstance) {
return getWholeLog(taskInstance, TaskLogType.OUTPUT);
}

/**
Expand All @@ -45,10 +49,25 @@ public byte[] getWholeLog(TaskInstance taskInstance) {
* @param limit The maximum number of lines to read.
* @return Returns the specified part of the log content in string format.
*/
public String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit) {

public String getTaskLogString(TaskInstance taskInstance, int skipLineNum, int limit) {
return getPartLog(taskInstance, skipLineNum, limit, TaskLogType.LOG);
}

public String getTaskOutputString(TaskInstance taskInstance, int skipLineNum, int limit) {
return getPartLog(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT);
}

private byte[] getWholeLog(TaskInstance taskInstance, TaskLogType taskLogType) {
return LogUtils.getFileContentBytesFromRemote(taskLogType.getLogPath(taskInstance));
}

private String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) {
// todo We can optimize requests by the actual range, reducing disk usage and network traffic.
return LogUtils.rollViewLogLines(
LogUtils.readPartFileContentFromRemote(taskInstance.getLogPath(), skipLineNum, limit));
LogUtils.readPartFileContentFromRemote(
taskLogType.getLogPath(taskInstance),
skipLineNum, limit));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.executor.logging;

import org.apache.dolphinscheduler.dao.entity.TaskInstance;

public enum TaskLogType {

LOG {

@Override
public String getLogPath(TaskInstance taskInstance) {
return taskInstance.getLogPath();
}
},
OUTPUT {

@Override
public String getLogPath(TaskInstance taskInstance) {
return taskInstance.getTaskOutPutLogPath();
}
};

public abstract String getLogPath(TaskInstance taskInstance);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public interface LoggerService {
* @param limit limit
* @return log string data
*/
Result<ResponseTaskLog> queryLog(User loginUser, int taskInstId, int skipLineNum, int limit);
Result<ResponseTaskLog> queryTaskLog(User loginUser, int taskInstId, int skipLineNum, int limit);

Result<ResponseTaskLog> queryTaskOutput(User loginUser, int taskInstId, int skipLineNum, int limit);

/**
* get log size
Expand All @@ -44,27 +46,8 @@ public interface LoggerService {
* @param taskInstId task instance id
* @return log byte array
*/
byte[] getLogBytes(User loginUser, int taskInstId);
byte[] getTaskLogBytes(User loginUser, int taskInstId);

/**
* query log
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
String queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit);
byte[] getTaskOutputBytes(User loginUser, int taskInstId);

/**
* get log bytes
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @return log byte array
*/
byte[] getLogBytes(User loginUser, long projectCode, int taskInstId);
}
Loading
Loading