diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java index 4f6f732c665a..c9b365b658aa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java @@ -74,11 +74,36 @@ public class LoggerController extends BaseController { @GetMapping(value = "/detail") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_TASK_INSTANCE_LOG_ERROR) - public Result queryLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "taskInstanceId") int taskInstanceId, - @RequestParam(value = "skipLineNum") int skipNum, - @RequestParam(value = "limit") int limit) { - return loggerService.queryLog(loginUser, taskInstanceId, skipNum, limit); + public Result queryTaskLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "taskInstanceId") int taskInstanceId, + @RequestParam(value = "skipLineNum") int skipNum, + @RequestParam(value = "limit") int limit) { + return loggerService.queryTaskLog(loginUser, taskInstanceId, skipNum, limit); + } + + /** + * query task output + * + * @param loginUser login user + * @param taskInstanceId task instance id + * @param skipNum skip number + * @param limit limit + * @return task log content + */ + @Operation(summary = "queryOutput", description = "QUERY_TASK_INSTANCE_OUTPUT_NOTES") + @Parameters({ + @Parameter(name = "taskInstanceId", description = "TASK_ID", required = true, schema = @Schema(implementation = int.class, example = "100")), + @Parameter(name = "skipLineNum", description = "SKIP_LINE_NUM", required = true, schema = @Schema(implementation = int.class, example = "100")), + @Parameter(name = "limit", description = "LIMIT", required = true, schema = @Schema(implementation = int.class, example = "100")) + }) + @GetMapping(value = "/output_detail") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_TASK_INSTANCE_LOG_ERROR) + public Result queryTaskOutput(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "taskInstanceId") int taskInstanceId, + @RequestParam(value = "skipLineNum") int skipNum, + @RequestParam(value = "limit") int limit) { + return loggerService.queryTaskOutput(loginUser, taskInstanceId, skipNum, limit); } /** @@ -97,7 +122,7 @@ public Result queryLog(@Parameter(hidden = true) @RequestAttrib @ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR) public ResponseEntity downloadTaskLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "taskInstanceId") int taskInstanceId) { - byte[] logBytes = loggerService.getLogBytes(loginUser, taskInstanceId); + byte[] logBytes = loggerService.getTaskLogBytes(loginUser, taskInstanceId); return ResponseEntity .ok() .header(HttpHeaders.CONTENT_DISPOSITION, @@ -105,4 +130,28 @@ public ResponseEntity downloadTaskLog(@Parameter(hidden = true) @RequestAttribut .body(logBytes); } + /** + * download task output file + * + * @param loginUser login user + * @param taskInstanceId task instance id + * @return task output file content + */ + @Operation(summary = "downloadTaskOutput", description = "DOWNLOAD_TASK_INSTANCE_OUTPUT_NOTES") + @Parameters({ + @Parameter(name = "taskInstanceId", description = "TASK_ID", required = true, schema = @Schema(implementation = int.class, example = "100")) + }) + @GetMapping(value = "/download-output") + @ResponseBody + @ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR) + public ResponseEntity downloadTaskOutput(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "taskInstanceId") int taskInstanceId) { + byte[] outputBytes = loggerService.getTaskOutputBytes(loginUser, taskInstanceId); + return ResponseEntity + .ok() + .header(HttpHeaders.CONTENT_DISPOSITION, + "attachment; filename=\"" + System.currentTimeMillis() + ".output.log" + "\"") + .body(outputBytes); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java index ebe8b2e9fea2..a766735b5aa0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java @@ -41,8 +41,12 @@ public class LocalLogClient { * @param taskInstance The task instance object, containing information needed to retrieve the log. * @return The complete log file download response of the task instance, including log content and metadata. */ - public TaskInstanceLogFileDownloadResponse getWholeLog(TaskInstance taskInstance) { - return getLocalWholeLog(taskInstance); + public TaskInstanceLogFileDownloadResponse getTaskLog(TaskInstance taskInstance) { + return getLocalWholeLog(taskInstance, TaskLogType.LOG); + } + + public TaskInstanceLogFileDownloadResponse getTaskOutput(TaskInstance taskInstance) { + return getLocalWholeLog(taskInstance, TaskLogType.OUTPUT); } /** @@ -55,23 +59,28 @@ public TaskInstanceLogFileDownloadResponse getWholeLog(TaskInstance taskInstance * @param limit The maximum number of lines to read, indicating the maximum number of lines to retrieve in this query. * @return The partial log query response, including log content within the specified range and metadata. */ - public TaskInstanceLogPageQueryResponse getPartLog(TaskInstance taskInstance, int skipLineNum, int limit) { - return getLocalPartLog(taskInstance, skipLineNum, limit); + public TaskInstanceLogPageQueryResponse getTaskLog(TaskInstance taskInstance, int skipLineNum, int limit) { + return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.LOG); + } + + public TaskInstanceLogPageQueryResponse getTaskOutput(TaskInstance taskInstance, int skipLineNum, int limit) { + return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT); } - private TaskInstanceLogFileDownloadResponse getLocalWholeLog(TaskInstance taskInstance) { + private TaskInstanceLogFileDownloadResponse getLocalWholeLog(TaskInstance taskInstance, TaskLogType taskLogType) { TaskInstanceLogFileDownloadRequest request = new TaskInstanceLogFileDownloadRequest( taskInstance.getId(), - taskInstance.getLogPath()); + taskLogType.getLogPath(taskInstance)); return getProxyLogService(taskInstance).getTaskInstanceWholeLogFileBytes(request); } private TaskInstanceLogPageQueryResponse getLocalPartLog(TaskInstance taskInstance, int skipLineNum, - int limit) { + int limit, TaskLogType taskLogType) { + String logFilePath = taskLogType.getLogPath(taskInstance); TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest .builder() .taskInstanceId(taskInstance.getId()) - .taskInstanceLogAbsolutePath(taskInstance.getLogPath()) + .taskInstanceLogAbsolutePath(logFilePath) .skipLineNum(skipLineNum) .limit(limit) .build(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java index ac5b6ecb2ba4..ec54b07af6a3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java @@ -50,19 +50,35 @@ public class LogClientDelegate { * @param limit The maximum number of log lines to retrieve. * @return A string containing the specified portion of the log. */ - public String getPartLogString(TaskInstance taskInstance, int skipLineNum, int limit) { + + public String getTaskLogString(TaskInstance taskInstance, int skipLineNum, int limit) { + return getPartLogString(taskInstance, skipLineNum, limit, TaskLogType.LOG); + } + + public String getTaskOutputString(TaskInstance taskInstance, int skipLineNum, int limit) { + return getPartLogString(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT); + } + + private String getPartLogString(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) { checkArgs(taskInstance); if (checkNodeExists(taskInstance)) { - TaskInstanceLogPageQueryResponse response = localLogClient.getPartLog(taskInstance, skipLineNum, limit); + TaskInstanceLogPageQueryResponse response = + taskLogType == TaskLogType.LOG + ? localLogClient.getTaskLog(taskInstance, skipLineNum, limit) + : localLogClient.getTaskOutput(taskInstance, skipLineNum, limit); if (response.getCode() == LogResponseStatus.SUCCESS) { return response.getLogContent(); } else { log.warn("get part log string is not success for task instance {}; reason :{}", taskInstance.getId(), response.getMessage()); - return remoteLogClient.getPartLog(taskInstance, skipLineNum, limit); + return taskLogType == TaskLogType.LOG + ? remoteLogClient.getTaskLogString(taskInstance, skipLineNum, limit) + : remoteLogClient.getTaskOutputString(taskInstance, skipLineNum, limit); } } else { - return remoteLogClient.getPartLog(taskInstance, skipLineNum, limit); + return taskLogType == TaskLogType.LOG + ? remoteLogClient.getTaskLogString(taskInstance, skipLineNum, limit) + : remoteLogClient.getTaskOutputString(taskInstance, skipLineNum, limit); } } @@ -73,19 +89,34 @@ public String getPartLogString(TaskInstance taskInstance, int skipLineNum, int l * @param taskInstance The task instance object, containing information needed for log retrieval. * @return A byte array containing the complete log content. */ - public byte[] getWholeLogBytes(TaskInstance taskInstance) { + public byte[] getTaskLogBytes(TaskInstance taskInstance) { + return getWholeLogBytes(taskInstance, TaskLogType.LOG); + } + + public byte[] getTaskOutputBytes(TaskInstance taskInstance) { + return getWholeLogBytes(taskInstance, TaskLogType.OUTPUT); + } + + private byte[] getWholeLogBytes(TaskInstance taskInstance, TaskLogType taskLogType) { checkArgs(taskInstance); if (checkNodeExists(taskInstance)) { - TaskInstanceLogFileDownloadResponse response = localLogClient.getWholeLog(taskInstance); + TaskInstanceLogFileDownloadResponse response = + taskLogType == TaskLogType.LOG + ? localLogClient.getTaskLog(taskInstance) + : localLogClient.getTaskOutput(taskInstance); if (response.getCode() == LogResponseStatus.SUCCESS) { return response.getLogBytes(); } else { log.warn("get whole log bytes is not success for task instance {}; reason :{}", taskInstance.getId(), response.getMessage()); - return remoteLogClient.getWholeLog(taskInstance); + return taskLogType == TaskLogType.LOG + ? remoteLogClient.getTaskLogBytes(taskInstance) + : remoteLogClient.getTaskOutputBytes(taskInstance); } } else { - return remoteLogClient.getWholeLog(taskInstance); + return taskLogType == TaskLogType.LOG + ? remoteLogClient.getTaskLogBytes(taskInstance) + : remoteLogClient.getTaskOutputBytes(taskInstance); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java index 1b3542e96209..e6cc81424cd2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java @@ -32,8 +32,12 @@ public class RemoteLogClient { * @param taskInstance The task instance object, containing information such as the task ID and log path. * @return Returns the log content in byte array format. */ - public byte[] getWholeLog(TaskInstance taskInstance) { - return LogUtils.getFileContentBytesFromRemote(taskInstance.getLogPath()); + public byte[] getTaskLogBytes(TaskInstance taskInstance) { + return getWholeLog(taskInstance, TaskLogType.LOG); + } + + public byte[] getTaskOutputBytes(TaskInstance taskInstance) { + return getWholeLog(taskInstance, TaskLogType.OUTPUT); } /** @@ -45,10 +49,25 @@ public byte[] getWholeLog(TaskInstance taskInstance) { * @param limit The maximum number of lines to read. * @return Returns the specified part of the log content in string format. */ - public String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit) { + + public String getTaskLogString(TaskInstance taskInstance, int skipLineNum, int limit) { + return getPartLog(taskInstance, skipLineNum, limit, TaskLogType.LOG); + } + + public String getTaskOutputString(TaskInstance taskInstance, int skipLineNum, int limit) { + return getPartLog(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT); + } + + private byte[] getWholeLog(TaskInstance taskInstance, TaskLogType taskLogType) { + return LogUtils.getFileContentBytesFromRemote(taskLogType.getLogPath(taskInstance)); + } + + private String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) { // todo We can optimize requests by the actual range, reducing disk usage and network traffic. return LogUtils.rollViewLogLines( - LogUtils.readPartFileContentFromRemote(taskInstance.getLogPath(), skipLineNum, limit)); + LogUtils.readPartFileContentFromRemote( + taskLogType.getLogPath(taskInstance), + skipLineNum, limit)); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogType.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogType.java new file mode 100644 index 000000000000..7cc89a24ee72 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogType.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.logging; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +public enum TaskLogType { + + LOG { + + @Override + public String getLogPath(TaskInstance taskInstance) { + return taskInstance.getLogPath(); + } + }, + OUTPUT { + + @Override + public String getLogPath(TaskInstance taskInstance) { + return taskInstance.getTaskOutPutLogPath(); + } + }; + + public abstract String getLogPath(TaskInstance taskInstance); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java index fe94f7ec43bc..4ba59348ce09 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java @@ -35,7 +35,9 @@ public interface LoggerService { * @param limit limit * @return log string data */ - Result queryLog(User loginUser, int taskInstId, int skipLineNum, int limit); + Result queryTaskLog(User loginUser, int taskInstId, int skipLineNum, int limit); + + Result queryTaskOutput(User loginUser, int taskInstId, int skipLineNum, int limit); /** * get log size @@ -44,27 +46,8 @@ public interface LoggerService { * @param taskInstId task instance id * @return log byte array */ - byte[] getLogBytes(User loginUser, int taskInstId); + byte[] getTaskLogBytes(User loginUser, int taskInstId); - /** - * query log - * - * @param loginUser login user - * @param projectCode project code - * @param taskInstId task instance id - * @param skipLineNum skip line number - * @param limit limit - * @return log string data - */ - String queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit); + byte[] getTaskOutputBytes(User loginUser, int taskInstId); - /** - * get log bytes - * - * @param loginUser login user - * @param projectCode project code - * @param taskInstId task instance id - * @return log byte array - */ - byte[] getLogBytes(User loginUser, long projectCode, int taskInstId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index a0b3b35fc1c7..821f489a54e8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -23,17 +23,16 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.executor.logging.LogClientDelegate; +import org.apache.dolphinscheduler.api.executor.logging.TaskLogType; import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.commons.lang3.StringUtils; @@ -65,9 +64,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService @Autowired private ProjectService projectService; - @Autowired - private TaskDefinitionMapper taskDefinitionMapper; - @Autowired private LogClientDelegate logClientDelegate; @@ -82,8 +78,21 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService */ @Override @SuppressWarnings("unchecked") - public Result queryLog(User loginUser, int taskInstId, int skipLineNum, int limit) { + public Result queryTaskLog(User loginUser, int taskInstId, int skipLineNum, int limit) { + return queryLog(loginUser, taskInstId, skipLineNum, limit, TaskLogType.LOG); + } + @Override + @SuppressWarnings("unchecked") + public Result queryTaskOutput(User loginUser, int taskInstId, int skipLineNum, int limit) { + return queryLog(loginUser, taskInstId, skipLineNum, limit, TaskLogType.OUTPUT); + } + + private Result queryLog(User loginUser, + int taskInstId, + int skipLineNum, + int limit, + TaskLogType taskLogType) { TaskInstance taskInstance = taskInstanceDao.queryById(taskInstId); if (taskInstance == null) { @@ -96,7 +105,7 @@ public Result queryLog(User loginUser, int taskInstId, int skip } projectService.checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); Result result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); - String log = queryLog(taskInstance, skipLineNum, limit); + String log = queryLog(taskInstance, skipLineNum, limit, taskLogType); int lineNum = log.split("\\r\\n").length; result.setData(new ResponseTaskLog(lineNum, log)); return result; @@ -110,68 +119,23 @@ public Result queryLog(User loginUser, int taskInstId, int skip * @return log byte array */ @Override - public byte[] getLogBytes(User loginUser, int taskInstId) { - TaskInstance taskInstance = taskInstanceDao.queryById(taskInstId); - if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { - throw new ServiceException("task instance is null or host is null"); - } - Project project = projectMapper.queryProjectByTaskInstanceId(taskInstId); - projectService.checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); - return getLogBytes(taskInstance); + public byte[] getTaskLogBytes(User loginUser, int taskInstId) { + return getLogBytes(loginUser, taskInstId, TaskLogType.LOG); } - /** - * query log - * - * @param loginUser login user - * @param projectCode project code - * @param taskInstId task instance id - * @param skipLineNum skip line number - * @param limit limit - * @return log string data - */ @Override - @SuppressWarnings("unchecked") - public String queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) { - // check user access for project - projectService.checkProjectAndAuthThrowException(loginUser, projectCode, VIEW_LOG); - // check whether the task instance can be found - TaskInstance task = taskInstanceDao.queryById(taskInstId); - if (task == null || StringUtils.isBlank(task.getHost())) { - throw new ServiceException(Status.TASK_INSTANCE_NOT_FOUND); - } - - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); - if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { - throw new ServiceException(Status.TASK_INSTANCE_NOT_FOUND, taskInstId); - } - return queryLog(task, skipLineNum, limit); + public byte[] getTaskOutputBytes(User loginUser, int taskInstId) { + return getLogBytes(loginUser, taskInstId, TaskLogType.OUTPUT); } - /** - * get log bytes - * - * @param loginUser login user - * @param projectCode project code - * @param taskInstId task instance id - * @return log byte array - */ - @Override - public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) { - // check user access for project - projectService.checkProjectAndAuthThrowException(loginUser, projectCode, DOWNLOAD_LOG); - - // check whether the task instance can be found - TaskInstance task = taskInstanceDao.queryById(taskInstId); - if (task == null || StringUtils.isBlank(task.getHost())) { + private byte[] getLogBytes(User loginUser, int taskInstId, TaskLogType taskLogType) { + TaskInstance taskInstance = taskInstanceDao.queryById(taskInstId); + if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { throw new ServiceException("task instance is null or host is null"); } - - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); - if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { - throw new ServiceException("task instance does not exist in project"); - } - return getLogBytes(task); + Project project = projectMapper.queryProjectByTaskInstanceId(taskInstId); + projectService.checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); + return getLogBytes(taskInstance, taskLogType); } /** @@ -182,8 +146,8 @@ public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) { * @param limit limit * @return log string data */ - private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { - final String logPath = taskInstance.getLogPath(); + private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) { + String logPath = taskLogType.getLogPath(taskInstance); log.info("Query task instance log, taskInstanceId:{}, taskInstanceName:{}, host: {}, logPath:{}", taskInstance.getId(), taskInstance.getName(), taskInstance.getHost(), logPath); if (StringUtils.isBlank(logPath)) { @@ -192,7 +156,7 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { } StringBuilder sb = new StringBuilder(); - if (skipLineNum == 0) { + if (shouldAppendLogHead(taskLogType) && skipLineNum == 0) { String head = String.format(LOG_HEAD_FORMAT, logPath, taskInstance.getHost(), @@ -201,7 +165,9 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { } try { - String logContent = logClientDelegate.getPartLogString(taskInstance, skipLineNum, limit); + String logContent = taskLogType == TaskLogType.LOG + ? logClientDelegate.getTaskLogString(taskInstance, skipLineNum, limit) + : logClientDelegate.getTaskOutputString(taskInstance, skipLineNum, limit); if (logContent != null) { sb.append(logContent); } @@ -217,9 +183,9 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { * @param taskInstance task instance * @return log byte array */ - private byte[] getLogBytes(TaskInstance taskInstance) { + private byte[] getLogBytes(TaskInstance taskInstance, TaskLogType taskLogType) { String host = taskInstance.getHost(); - String logPath = taskInstance.getLogPath(); + String logPath = taskLogType.getLogPath(taskInstance); byte[] head = String.format(LOG_HEAD_FORMAT, logPath, @@ -229,11 +195,20 @@ private byte[] getLogBytes(TaskInstance taskInstance) { byte[] logBytes; try { - logBytes = logClientDelegate.getWholeLogBytes(taskInstance); + logBytes = taskLogType == TaskLogType.LOG + ? logClientDelegate.getTaskLogBytes(taskInstance) + : logClientDelegate.getTaskOutputBytes(taskInstance); + if (!shouldAppendLogHead(taskLogType)) { + return logBytes; + } return Bytes.concat(head, logBytes); } catch (Exception ex) { log.error("Download TaskInstance: {} Log Error", taskInstance.getName(), ex); throw new ServiceException(Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR); } } + + private boolean shouldAppendLogHead(TaskLogType taskLogType) { + return taskLogType == TaskLogType.LOG; + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java index 38ed86900790..db8107b1931a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java @@ -116,7 +116,7 @@ public void testGetWholeLogSuccess() { taskInstance.setId(1); taskInstance.setLogPath("/path/to/log"); - TaskInstanceLogFileDownloadResponse actualResponse = localLogClient.getWholeLog(taskInstance); + TaskInstanceLogFileDownloadResponse actualResponse = localLogClient.getTaskLog(taskInstance); assertNotNull(actualResponse); assertArrayEquals("".getBytes(), actualResponse.getLogBytes()); @@ -129,7 +129,7 @@ public void testGetPartLogSuccess() { taskInstance.setHost("127.0.0.1:" + nettyServerPort); taskInstance.setLogPath("/path/to/log"); - TaskInstanceLogPageQueryResponse actualResponse = localLogClient.getPartLog(taskInstance, 0, 10); + TaskInstanceLogPageQueryResponse actualResponse = localLogClient.getTaskLog(taskInstance, 0, 10); assertNotNull(actualResponse); assertEquals("Partial log content", actualResponse.getLogContent()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegateTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegateTest.java index edf85268a723..87df4eca54b5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegateTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegateTest.java @@ -54,7 +54,7 @@ public class LogClientDelegateTest { @Test public void testGetPartLogStringTaskInstanceNullThrowsException() { - assertThrows(IllegalArgumentException.class, () -> logClientDelegate.getPartLogString(null, 0, 10)); + assertThrows(IllegalArgumentException.class, () -> logClientDelegate.getTaskLogString(null, 0, 10)); } @Test @@ -64,12 +64,26 @@ public void testGetPartLogStringNodeExistsLocalSuccess() { taskInstance.setHost("localhost"); taskInstance.setTaskType("SHELL"); when(registryClient.checkNodeExists(eq(taskInstance.getHost()), any())).thenReturn(true); - when(localLogClient.getPartLog(taskInstance, 0, 10)) + when(localLogClient.getTaskLog(taskInstance, 0, 10)) .thenReturn(new TaskInstanceLogPageQueryResponse("logContent", LogResponseStatus.SUCCESS, "")); - String result = logClientDelegate.getPartLogString(taskInstance, 0, 10); + String result = logClientDelegate.getTaskLogString(taskInstance, 0, 10); assertEquals("logContent", result); } + @Test + public void testGetTaskOutputStringNodeExistsLocalSuccess() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setHost("localhost"); + taskInstance.setTaskType("SHELL"); + when(registryClient.checkNodeExists(eq(taskInstance.getHost()), any())).thenReturn(true); + when(localLogClient.getTaskOutput(taskInstance, 0, 10)) + .thenReturn(new TaskInstanceLogPageQueryResponse("outputContent", LogResponseStatus.SUCCESS, "")); + + String result = logClientDelegate.getTaskOutputString(taskInstance, 0, 10); + assertEquals("outputContent", result); + } + @Test public void testGetPartLogStringNodeExistsLocalFailure() { TaskInstance taskInstance = new TaskInstance(); @@ -78,11 +92,11 @@ public void testGetPartLogStringNodeExistsLocalFailure() { taskInstance.setTaskType("SHELL"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.WORKER)).thenReturn(true); - when(localLogClient.getPartLog(taskInstance, 0, 10)).thenReturn( + when(localLogClient.getTaskLog(taskInstance, 0, 10)).thenReturn( new TaskInstanceLogPageQueryResponse(null, LogResponseStatus.ERROR, "error")); - when(remoteLogClient.getPartLog(taskInstance, 0, 10)).thenReturn("remoteLogContent"); + when(remoteLogClient.getTaskLogString(taskInstance, 0, 10)).thenReturn("remoteLogContent"); - String result = logClientDelegate.getPartLogString(taskInstance, 0, 10); + String result = logClientDelegate.getTaskLogString(taskInstance, 0, 10); assertEquals("remoteLogContent", result); } @@ -94,15 +108,15 @@ public void testGetPartLogStringNodeNotExists() { taskInstance.setTaskType("SHELL"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.WORKER)).thenReturn(false); - when(remoteLogClient.getPartLog(taskInstance, 0, 10)).thenReturn("remoteLogContent"); + when(remoteLogClient.getTaskLogString(taskInstance, 0, 10)).thenReturn("remoteLogContent"); - String result = logClientDelegate.getPartLogString(taskInstance, 0, 10); + String result = logClientDelegate.getTaskLogString(taskInstance, 0, 10); assertEquals("remoteLogContent", result); } @Test public void testGetWholeLogBytesTaskInstanceNullThrowsException() { - assertThrows(IllegalArgumentException.class, () -> logClientDelegate.getWholeLogBytes(null)); + assertThrows(IllegalArgumentException.class, () -> logClientDelegate.getTaskLogBytes(null)); } @Test @@ -113,10 +127,10 @@ public void testGetWholeLogBytesNodeExistsLocalSuccess() { taskInstance.setTaskType("SWITCH"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.MASTER)).thenReturn(true); - when(localLogClient.getWholeLog(taskInstance)).thenReturn( + when(localLogClient.getTaskLog(taskInstance)).thenReturn( new TaskInstanceLogFileDownloadResponse("logBytes".getBytes(), LogResponseStatus.SUCCESS, null)); - byte[] result = logClientDelegate.getWholeLogBytes(taskInstance); + byte[] result = logClientDelegate.getTaskLogBytes(taskInstance); assertArrayEquals("logBytes".getBytes(), result); } @@ -128,11 +142,11 @@ public void testGetWholeLogBytesNodeExistsLocalFailure() { taskInstance.setTaskType("SWITCH"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.MASTER)).thenReturn(true); - when(localLogClient.getWholeLog(taskInstance)).thenReturn( + when(localLogClient.getTaskLog(taskInstance)).thenReturn( new TaskInstanceLogFileDownloadResponse(null, LogResponseStatus.ERROR, "error")); - when(remoteLogClient.getWholeLog(taskInstance)).thenReturn("remoteLogBytes".getBytes()); + when(remoteLogClient.getTaskLogBytes(taskInstance)).thenReturn("remoteLogBytes".getBytes()); - byte[] result = logClientDelegate.getWholeLogBytes(taskInstance); + byte[] result = logClientDelegate.getTaskLogBytes(taskInstance); assertArrayEquals("remoteLogBytes".getBytes(), result); } @@ -144,9 +158,9 @@ public void testGetWholeLogBytesNodeNotExists() { taskInstance.setTaskType("SWITCH"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.MASTER)).thenReturn(false); - when(remoteLogClient.getWholeLog(taskInstance)).thenReturn("remoteLogBytes".getBytes()); + when(remoteLogClient.getTaskLogBytes(taskInstance)).thenReturn("remoteLogBytes".getBytes()); - byte[] result = logClientDelegate.getWholeLogBytes(taskInstance); + byte[] result = logClientDelegate.getTaskLogBytes(taskInstance); assertArrayEquals("remoteLogBytes".getBytes(), result); } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index 8788696cbf9c..4c6fcf577a15 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -17,11 +17,9 @@ package org.apache.dolphinscheduler.api.service; -import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow; import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doNothing; @@ -34,20 +32,13 @@ import org.apache.dolphinscheduler.api.executor.logging.LogClientDelegate; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; -import java.text.MessageFormat; -import java.util.HashMap; -import java.util.Map; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -77,9 +68,6 @@ public class LoggerServiceTest { @Mock private ProjectService projectService; - @Mock - private TaskDefinitionMapper taskDefinitionMapper; - @Mock private LogClientDelegate logClientDelegate; @@ -93,13 +81,13 @@ public void testQueryLog() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setExecutorId(loginUser.getId() + 1); when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - Result result = loggerService.queryLog(loginUser, 2, 1, 1); + Result result = loggerService.queryTaskLog(loginUser, 2, 1, 1); // TASK_INSTANCE_NOT_FOUND Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue()); try { // HOST NOT FOUND OR ILLEGAL - result = loggerService.queryLog(loginUser, 1, 1, 1); + result = loggerService.queryTaskLog(loginUser, 1, 1, 1); } catch (RuntimeException e) { Assertions.assertTrue(true); logger.error("testQueryDataSourceList error {}", e.getMessage()); @@ -112,27 +100,27 @@ public void testQueryLog() { doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); AssertionsHelper.assertThrowsServiceException(Status.PROJECT_NOT_EXIST, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.queryTaskLog(loginUser, 1, 1, 1)); // USER_NO_OPERATION_PERM doThrow(new ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService) .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); AssertionsHelper.assertThrowsServiceException(Status.USER_NO_OPERATION_PERM, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.queryTaskLog(loginUser, 1, 1, 1)); // SUCCESS doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - result = loggerService.queryLog(loginUser, 1, 1, 1); + result = loggerService.queryTaskLog(loginUser, 1, 1, 1); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); - result = loggerService.queryLog(loginUser, 1, 0, 1); + result = loggerService.queryTaskLog(loginUser, 1, 0, 1); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); taskInstance.setLogPath(""); assertThrowsServiceException(Status.QUERY_TASK_INSTANCE_LOG_ERROR, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.queryTaskLog(loginUser, 1, 1, 1)); } @Test @@ -140,6 +128,8 @@ public void testGetLogBytes() { User loginUser = new User(); loginUser.setId(1); + Project project = new Project(); + project.setCode(1L); TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); taskInstance.setExecutorId(loginUser.getId() + 1); @@ -147,7 +137,7 @@ public void testGetLogBytes() { // task instance is null try { - loggerService.getLogBytes(loginUser, 2); + loggerService.getTaskLogBytes(loginUser, 2); } catch (ServiceException e) { Assertions.assertEquals(new ServiceException("task instance is null or host is null").getMessage(), e.getMessage()); @@ -156,7 +146,7 @@ public void testGetLogBytes() { // task instance host is null try { - loggerService.getLogBytes(loginUser, 1); + loggerService.getTaskLogBytes(loginUser, 1); } catch (ServiceException e) { Assertions.assertEquals(new ServiceException("task instance is null or host is null").getMessage(), e.getMessage()); @@ -166,138 +156,54 @@ public void testGetLogBytes() { // PROJECT_NOT_EXIST taskInstance.setHost("127.0.0.1:" + nettyServerPort); taskInstance.setLogPath("/temp/log"); + when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project); doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) - .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); + .checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); AssertionsHelper.assertThrowsServiceException(Status.PROJECT_NOT_EXIST, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.getTaskLogBytes(loginUser, 1)); // USER_NO_OPERATION_PERM doThrow(new ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService) - .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); + .checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); AssertionsHelper.assertThrowsServiceException(Status.USER_NO_OPERATION_PERM, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.getTaskLogBytes(loginUser, 1)); // SUCCESS - when(logClientDelegate.getWholeLogBytes(any())).thenReturn(new byte[0]); - doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), - DOWNLOAD_LOG); - when(logClientDelegate.getWholeLogBytes(any())).thenReturn(new byte[0]); - byte[] logBytes = loggerService.getLogBytes(loginUser, 1); + when(logClientDelegate.getTaskLogBytes(any())).thenReturn(new byte[0]); + doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); + when(logClientDelegate.getTaskLogBytes(any())).thenReturn(new byte[0]); + byte[] logBytes = loggerService.getTaskLogBytes(loginUser, 1); Assertions.assertEquals(42, logBytes.length - String.valueOf(nettyServerPort).length()); } @Test - public void testQueryLogInSpecifiedProject() { - long projectCode = 1L; + public void testQueryTaskOutputAndGetOutputBytes() { User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); + loginUser.setId(1); + Project project = new Project(); + project.setCode(1L); TaskInstance taskInstance = new TaskInstance(); - when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - when(taskInstanceDao.queryById(10)).thenReturn(null); - - assertThrowsServiceException(Status.TASK_INSTANCE_NOT_FOUND, - () -> loggerService.queryLog(loginUser, projectCode, 10, 1, 1)); - - TaskDefinition taskDefinition = new TaskDefinition(); - taskDefinition.setProjectCode(projectCode); - taskDefinition.setCode(1L); - - // SUCCESS - taskInstance.setTaskCode(1L); taskInstance.setId(1); + taskInstance.setExecutorId(loginUser.getId() + 1); taskInstance.setHost("127.0.0.1:" + nettyServerPort); - taskInstance.setLogPath("/temp/log"); - doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, projectCode, VIEW_LOG); + taskInstance.setTaskOutPutLogPath("/temp/output.log"); when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); - assertDoesNotThrow(() -> loggerService.queryLog(loginUser, projectCode, 1, 1, 1)); - - taskDefinition.setProjectCode(10); - assertThrowsServiceException(Status.TASK_INSTANCE_NOT_FOUND, - () -> loggerService.queryLog(loginUser, projectCode, 1, 1, 1)); - - taskDefinition.setProjectCode(1); - taskInstance.setId(10); - when(taskInstanceDao.queryById(10)).thenReturn(taskInstance); - - when(logClientDelegate.getPartLogString(any(), anyInt(), anyInt())).thenReturn("log content"); - - String result = loggerService.queryLog(loginUser, projectCode, 10, 1, 1); - assertEquals("log content", result); - taskInstance.setId(100); - when(taskInstanceDao.queryById(100)).thenReturn(taskInstance); - doThrow(new ServiceException("query log error")).when(logClientDelegate).getPartLogString(any(), anyInt(), - anyInt()); - assertThrowsServiceException(Status.QUERY_TASK_INSTANCE_LOG_ERROR, - () -> loggerService.queryLog(loginUser, projectCode, 10, 1, 1)); - } - - @Test - public void testGetLogBytesInSpecifiedProject() { - long projectCode = 1L; - when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); - - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); - Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - TaskInstance taskInstance = new TaskInstance(); - TaskDefinition taskDefinition = new TaskDefinition(); - taskDefinition.setProjectCode(projectCode); - taskDefinition.setCode(1L); - // SUCCESS - taskInstance.setTaskCode(1L); - taskInstance.setId(1); - taskInstance.setHost("127.0.0.1:" + nettyServerPort); - taskInstance.setLogPath("/temp/log"); - doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, projectCode, DOWNLOAD_LOG); + doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), + VIEW_LOG); + when(logClientDelegate.getTaskOutputString(any(), anyInt(), anyInt())).thenReturn("output content"); - when(taskInstanceDao.queryById(1)).thenReturn(null); - assertThrowsServiceException( - Status.INTERNAL_SERVER_ERROR_ARGS, () -> loggerService.getLogBytes(loginUser, projectCode, 1)); + Result result = loggerService.queryTaskOutput(loginUser, 1, 1, 1); + Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); + Assertions.assertEquals("output content", result.getData().getMessage()); - when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); - when(logClientDelegate.getWholeLogBytes(any())).thenReturn(new byte[0]); - assertDoesNotThrow(() -> loggerService.getLogBytes(loginUser, projectCode, 1)); - - taskDefinition.setProjectCode(2L); - assertThrowsServiceException(Status.INTERNAL_SERVER_ERROR_ARGS, - () -> loggerService.getLogBytes(loginUser, projectCode, 1)); - - taskDefinition.setProjectCode(1L); - taskInstance.setId(100); - when(taskInstanceDao.queryById(100)).thenReturn(taskInstance); - doThrow(new ServiceException("download error")).when(logClientDelegate).getWholeLogBytes(any()); - assertThrowsServiceException(Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR, - () -> loggerService.getLogBytes(loginUser, projectCode, 100)); - } + when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project); + doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); + when(logClientDelegate.getTaskOutputBytes(any())).thenReturn(new byte[0]); - /** - * get mock Project - * - * @param projectCode projectCode - * @return Project - */ - private Project getProject(long projectCode) { - Project project = new Project(); - project.setCode(projectCode); - project.setId(1); - project.setName("test"); - project.setUserId(1); - return project; + byte[] outputBytes = loggerService.getTaskOutputBytes(loginUser, 1); + Assertions.assertEquals(0, outputBytes.length); } - private void putMsg(Map result, Status status, Object... statusParams) { - result.put(Constants.STATUS, status); - if (statusParams != null && statusParams.length > 0) { - result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams)); - } else { - result.put(Constants.MSG, status.getMsg()); - } - } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java index 592231fffc57..64cc8db79d32 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java @@ -500,7 +500,6 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException { .thenReturn(Optional.of(workflowInstance)); when(taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstance.getId())) .thenReturn(taskInstanceList); - when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res); when(taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList, ContextType.DEPENDENT_RESULT_CONTEXT)) .thenReturn(Lists.asList(taskInstanceContext, new TaskInstanceContext[0])); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index f94e88b59513..8a787afa91da 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -80,6 +80,7 @@ public class TaskInstance implements Serializable { private String executePath; private String logPath; + private String taskOutPutLogPath; private int retryTimes; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorFactory.java index 355fdf76bca0..322bc6d93098 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorFactory.java @@ -49,7 +49,8 @@ public ITaskExecutor createTaskExecutor(final TaskExecutionContext taskExecution } private void assemblyTaskLogPath(final TaskExecutionContext taskExecutionContext) { - taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); + taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext, "log")); + taskExecutionContext.setTaskOutputLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext, "out")); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskRunningLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskRunningLifecycleEvent.java index e95d0c1e3f42..c8332690f544 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskRunningLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskRunningLifecycleEvent.java @@ -36,6 +36,7 @@ public class TaskRunningLifecycleEvent extends AbstractTaskLifecycleEvent { private final ITaskExecutionRunnable taskExecutionRunnable; private final String logPath; + private final String taskOutPutLogPath; private final Date startTime; @@ -49,6 +50,7 @@ public String toString() { return "TaskRunningLifecycleEvent{" + "task=" + taskExecutionRunnable.getName() + ", logPath='" + logPath + '\'' + + ", taskOutPutLogPath='" + taskOutPutLogPath + '\'' + ", startTime=" + startTime + '}'; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/AbstractTaskInstanceFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/AbstractTaskInstanceFactory.java index 160d0d56af7b..6eeadee7e6d4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/AbstractTaskInstanceFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/AbstractTaskInstanceFactory.java @@ -50,6 +50,7 @@ protected TaskInstance cloneTaskInstance(TaskInstance originTaskInstance) { result.setEndTime(originTaskInstance.getEndTime()); result.setHost(originTaskInstance.getHost()); result.setExecutePath(originTaskInstance.getExecutePath()); + result.setTaskOutPutLogPath(originTaskInstance.getTaskOutPutLogPath()); result.setLogPath(originTaskInstance.getLogPath()); result.setRetryTimes(originTaskInstance.getRetryTimes()); result.setAlertFlag(originTaskInstance.getAlertFlag()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java index 88281145c67c..9921623ba87f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java @@ -160,6 +160,7 @@ protected void persistentTaskInstanceStartedEventToDB(final ITaskExecutionRunnab taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(taskRunningEvent.getStartTime()); taskInstance.setLogPath(taskRunningEvent.getLogPath()); + taskInstance.setTaskOutPutLogPath(taskRunningEvent.getTaskOutPutLogPath()); taskInstanceDao.updateById(taskInstance); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java index 94df43ede4d0..dc7ee206cfc1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java @@ -79,6 +79,7 @@ public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent taskEx .taskExecutionRunnable(taskExecutionRunnable) .startTime(new Date(taskExecutorStartedLifecycleEvent.getStartTime())) .logPath(taskExecutorStartedLifecycleEvent.getLogPath()) + .taskOutPutLogPath(taskExecutorStartedLifecycleEvent.getTaskOutputLogPath()) .build(); taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent); diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index a4c3a4f22dfe..2b995f757e9f 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -33,7 +33,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -51,6 +53,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + ${log.base}/dolphinscheduler-master.log @@ -77,4 +98,8 @@ + + + + diff --git a/dolphinscheduler-master/src/test/resources/logback.xml b/dolphinscheduler-master/src/test/resources/logback.xml index 1490f5b55684..70dee6aa3c80 100644 --- a/dolphinscheduler-master/src/test/resources/logback.xml +++ b/dolphinscheduler-master/src/test/resources/logback.xml @@ -33,7 +33,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -51,6 +53,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + ${log.base}/dolphinscheduler-master.log @@ -73,4 +94,8 @@ + + + + diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml index 1380995f9032..6e4488970bc5 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml @@ -54,7 +54,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -72,6 +74,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + @@ -83,4 +104,8 @@ + + + + diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java index 94126bb39d25..d27924302b79 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java @@ -42,6 +42,7 @@ public class TaskExecutorStartedLifecycleEvent extends AbstractTaskExecutorLifec private long startTime; private String logPath; + private String taskOutputLogPath; private String executePath; @@ -56,6 +57,7 @@ public static TaskExecutorStartedLifecycleEvent of(final ITaskExecutor taskExecu .taskInstanceHost(taskExecutionContext.getHost()) .startTime(taskExecutor.getTaskExecutionContext().getStartTime()) .logPath(taskExecutionContext.getLogPath()) + .taskOutputLogPath(taskExecutionContext.getTaskOutputLogPath()) .executePath(taskExecutionContext.getExecutePath()) .type(TaskExecutorLifecycleEventType.RUNNING) .build(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 86199c874e19..af44d0f27bd6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -47,6 +47,10 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.client.dsl.LogWatch; /** @@ -55,6 +59,8 @@ @Slf4j public abstract class AbstractCommandExecutor { + private static final Logger TASK_OUTPUT_LOGGER = LoggerFactory.getLogger(LogUtils.TASK_OUTPUT_LOGGER_NAME); + protected volatile Map taskOutputParams = new HashMap<>(); private Process process; @@ -227,15 +233,22 @@ private CompletableFuture collectProcessLog(Process process) { TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); - String line; - while ((line = inReader.readLine()) != null) { - log.info(" -> {}", line); - taskOutputParameterParser.appendParseLog(line); + try ( + LogUtils.MDCAutoClosableContext ignored = + LogUtils.withTaskOutputLogPathMDC(taskRequest.getTaskOutputLogPath())) { + for (String line : (Iterable) inReader.lines()::iterator) { + if (StringUtils.isBlank(taskRequest.getTaskOutputLogPath())) { + log.info(" -> {}", line); + } else { + TASK_OUTPUT_LOGGER.info(line); + } + taskOutputParameterParser.appendParseLog(line); + } + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); } } catch (Exception e) { log.error("Parse var pool error", e); - } finally { - LogUtils.removeTaskInstanceLogFullPathMDC(); } taskOutputParams = taskOutputParameterParser.getTaskOutputParams(); }, collectProcessLogService); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index 94b4afd2abb1..cb5612b9ce95 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -63,6 +63,8 @@ public class TaskExecutionContext implements Serializable { private String logPath; + private String taskOutputLogPath; + private String appInfoPath; private int processId; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index f48169c5e150..0d5223d0bedc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -63,6 +63,10 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.Affinity; import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.EnvVar; @@ -84,6 +88,8 @@ @Slf4j public class K8sTaskExecutor extends AbstractK8sTaskExecutor { + private static final Logger TASK_OUTPUT_LOGGER = LoggerFactory.getLogger(LogUtils.TASK_OUTPUT_LOGGER_NAME); + private Job job; protected boolean podLogOutputIsFinished = false; protected Future podLogOutputFuture; @@ -270,12 +276,18 @@ private void parsePodLogOutput() { taskRequest.getTaskInstanceId()); LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); try ( + LogUtils.MDCAutoClosableContext ignored = + LogUtils.withTaskOutputLogPathMDC(taskRequest.getTaskOutputLogPath()); LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId(), containerName)) { String line; try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { while ((line = reader.readLine()) != null) { - log.info("[K8S-pod-log] {}", line); + if (StringUtils.isBlank(taskRequest.getTaskOutputLogPath())) { + log.info("[K8S-pod-log] {}", line); + } else { + TASK_OUTPUT_LOGGER.info(line); + } taskOutputParameterParser.appendParseLog(line); } } @@ -283,6 +295,7 @@ private void parsePodLogOutput() { throw new RuntimeException(e); } finally { LogUtils.removeTaskInstanceLogFullPathMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); podLogOutputIsFinished = true; } taskOutputParams = taskOutputParameterParser.getTaskOutputParams(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java index 89a11e14fe7a..d2aaf34f0526 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java @@ -17,14 +17,9 @@ package org.apache.dolphinscheduler.plugin.task.api.log; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; - import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; - -import org.slf4j.MDC; - import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.sift.AbstractDiscriminator; @@ -42,10 +37,11 @@ public class TaskLogDiscriminator extends AbstractDiscriminator { @Override public String getDiscriminatingValue(ILoggingEvent event) { - String taskInstanceLogPath = MDC.get(LogUtils.TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); - if (taskInstanceLogPath == null) { - log.error("The task instance log path is null, please check the logback configuration, log: {}", event); + String taskLogPath = event.getMDCPropertyMap().get(key); + if (taskLogPath == null) { + log.error("The task log path in MDC key {} is null, please check the logback configuration, log: {}", + key, event); } - return taskInstanceLogPath; + return taskLogPath; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java index 6f9d9ec90b40..29e5d72964e6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java @@ -17,13 +17,12 @@ package org.apache.dolphinscheduler.plugin.task.api.log; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; - import org.apache.commons.lang3.StringUtils; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.slf4j.MDC; import org.slf4j.Marker; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -34,13 +33,17 @@ * This class is used to filter the log of the task instance. */ @Slf4j +@Getter +@Setter public class TaskLogFilter extends Filter { + private String key; + @Override public FilterReply decide(ILoggingEvent event) { - String taskInstanceLogPath = MDC.get(LogUtils.TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); - // If the taskInstanceLogPath is empty, it means that the log is not related to a task instance. - if (StringUtils.isEmpty(taskInstanceLogPath)) { + String taskLogPath = event.getMDCPropertyMap().get(key); + // If the taskLogPath is empty, it means that the log is not related to the current log file. + if (StringUtils.isEmpty(taskLogPath)) { return FilterReply.DENY; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java index b251b33f65aa..621d353c108a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java @@ -59,6 +59,8 @@ public class LogUtils { private static final Path TASK_INSTANCE_LOG_BASE_PATH = getTaskInstanceLogBasePath(); public static final String TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY = "taskInstanceLogFullPath"; + public static final String TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY = "taskOutputLogFullPath"; + public static final String TASK_OUTPUT_LOGGER_NAME = "TaskOutput"; private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX); @@ -86,13 +88,13 @@ public List getAppIds(String logPath, String appInfoPath, String fetchWa * @param taskExecutionContext task execution context. * @return task instance log full path. */ - public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext) { + public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext, String logType) { return getTaskInstanceLogFullPath( DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), taskExecutionContext.getWorkflowDefinitionCode(), taskExecutionContext.getWorkflowDefinitionVersion(), taskExecutionContext.getWorkflowInstanceId(), - taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.getTaskInstanceId(), logType); } /** @@ -110,7 +112,8 @@ public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime, Long workflowDefinitionCode, int workflowDefinitionVersion, int workflowInstanceId, - int taskInstanceId) { + int taskInstanceId, + String logType) { if (TASK_INSTANCE_LOG_BASE_PATH == null) { throw new IllegalArgumentException( "Cannot find the task instance log base path, please check your logback.xml file"); @@ -119,7 +122,7 @@ public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime, String.valueOf(workflowDefinitionCode), String.valueOf(workflowDefinitionVersion), String.valueOf(workflowInstanceId), - String.format("%s.log", taskInstanceId)).toString(); + String.format("%s.%s", taskInstanceId, logType)).toString(); return TASK_INSTANCE_LOG_BASE_PATH .resolve(DateUtils.format(taskFirstSubmitTime, DateConstants.YYYYMMDD, null)) .resolve(taskLogFileName) @@ -187,6 +190,10 @@ public static String getTaskInstanceLogFullPathMdc() { return MDC.get(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); } + public static String getTaskOutputLogFullPathMdc() { + return MDC.get(TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY); + } + public static void setTaskInstanceLogFullPathMDC(String taskInstanceLogFullPath) { if (taskInstanceLogFullPath == null) { log.warn("taskInstanceLogFullPath is null"); @@ -199,6 +206,37 @@ public static void removeTaskInstanceLogFullPathMDC() { MDC.remove(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); } + public static void setTaskOutputLogFullPathMDC(String taskOutputLogFullPath) { + if (taskOutputLogFullPath == null) { + log.warn("taskOutputLogFullPath is null"); + return; + } + MDC.put(TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY, taskOutputLogFullPath); + } + + public static void removeTaskOutputLogFullPathMDC() { + MDC.remove(TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY); + } + + public static MDCAutoClosableContext withTaskOutputLogPathMDC(String taskOutputLogFullPath) { + final String originalTaskOutputLogFullPath = getTaskOutputLogFullPathMdc(); + if (taskOutputLogFullPath == null) { + removeTaskOutputLogFullPathMDC(); + } else { + setTaskOutputLogFullPathMDC(taskOutputLogFullPath); + } + return new MDCAutoClosableContext( + () -> restoreMDC(TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY, originalTaskOutputLogFullPath)); + } + + private static void restoreMDC(String key, String value) { + if (value == null) { + MDC.remove(key); + return; + } + MDC.put(key, value); + } + public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) { MDC.put(TaskConstants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId)); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminatorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminatorTest.java new file mode 100644 index 000000000000..10e67175c20b --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminatorTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.junit.jupiter.api.Test; + +import ch.qos.logback.classic.spi.ILoggingEvent; + +class TaskLogDiscriminatorTest { + + @Test + void shouldGetDiscriminatingValueByConfiguredMdcKey() { + TaskLogDiscriminator discriminator = new TaskLogDiscriminator(); + discriminator.setKey("taskOutputLogFullPath"); + + ILoggingEvent event = mock(ILoggingEvent.class); + when(event.getMDCPropertyMap()) + .thenReturn(Collections.singletonMap("taskOutputLogFullPath", "/tmp/task-output.log")); + + assertEquals("/tmp/task-output.log", discriminator.getDiscriminatingValue(event)); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilterTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilterTest.java new file mode 100644 index 000000000000..4900a61ce568 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilterTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.junit.jupiter.api.Test; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.spi.FilterReply; + +class TaskLogFilterTest { + + @Test + void shouldAcceptLogWhenConfiguredMdcKeyExists() { + TaskLogFilter filter = new TaskLogFilter(); + filter.setKey("taskOutputLogFullPath"); + + ILoggingEvent event = mock(ILoggingEvent.class); + when(event.getMDCPropertyMap()) + .thenReturn(Collections.singletonMap("taskOutputLogFullPath", "/tmp/task-output.log")); + + assertEquals(FilterReply.ACCEPT, filter.decide(event)); + } + + @Test + void shouldDenyLogWhenConfiguredMdcKeyMissing() { + TaskLogFilter filter = new TaskLogFilter(); + filter.setKey("taskOutputLogFullPath"); + + ILoggingEvent event = mock(ILoggingEvent.class); + when(event.getMDCPropertyMap()).thenReturn(Collections.emptyMap()); + + assertEquals(FilterReply.DENY, filter.decide(event)); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 33b85e1a66b8..884fd9156e4c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -60,12 +61,18 @@ import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; import com.fasterxml.jackson.databind.node.ObjectNode; @Slf4j public class SqlTask extends AbstractTask { + private static final Logger TASK_OUTPUT_LOGGER = LoggerFactory.getLogger(LogUtils.TASK_OUTPUT_LOGGER_NAME); + private final TaskExecutionContext taskExecutionContext; private final SqlParameters sqlParameters; @@ -272,11 +279,8 @@ private String resultProcess(ResultSet resultSet) throws Exception { int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS; displayRows = Math.min(displayRows, resultJSONArray.size()); - log.info("display sql result {} rows as follows:", displayRows); - for (int i = 0; i < displayRows; i++) { - String row = JSONUtils.toJsonString(resultJSONArray.get(i)); - log.info("row {} : {}", i + 1, row); - } + + logSqlResultPreview(columnLabels, resultJSONArray, displayRows); } String result = resultJSONArray.isEmpty() ? JSONUtils.toJsonString(generateEmptyRow(resultSet)) @@ -300,7 +304,6 @@ private ArrayNode generateEmptyRow(ResultSet resultSet) throws SQLException { if (resultSet != null) { ResultSetMetaData metaData = resultSet.getMetaData(); int columnsNum = metaData.getColumnCount(); - log.info("sql query results is empty"); for (int i = 1; i <= columnsNum; i++) { emptyOfColValues.set(metaData.getColumnLabel(i), JSONUtils.toJsonNode("")); } @@ -311,6 +314,52 @@ private ArrayNode generateEmptyRow(ResultSet resultSet) throws SQLException { return resultJSONArray; } + private void logSqlResultPreview(String[] columnLabels, ArrayNode resultJSONArray, int displayRows) { + if (StringUtils.isBlank(taskExecutionContext.getTaskOutputLogPath())) { + doLogSqlResultPreview(columnLabels, resultJSONArray, displayRows); + return; + } + + try ( + LogUtils.MDCAutoClosableContext ignored = + LogUtils.withTaskOutputLogPathMDC(taskExecutionContext.getTaskOutputLogPath())) { + doLogSqlResultPreview(columnLabels, resultJSONArray, displayRows); + } + } + + private void doLogSqlResultPreview(String[] columnLabels, ArrayNode resultJSONArray, int displayRows) { + logSqlResultLine(String.join("\t", columnLabels)); + for (int i = 0; i < displayRows; i++) { + ObjectNode rowNode = (ObjectNode) resultJSONArray.get(i); + List rowValues = new ArrayList<>(columnLabels.length); + for (String columnLabel : columnLabels) { + rowValues.add(formatSqlResultValue(rowNode.get(columnLabel))); + } + logSqlResultLine(String.join("\t", rowValues)); + } + } + + private void logSqlResultLine(String line) { + if (StringUtils.isBlank(taskExecutionContext.getTaskOutputLogPath())) { + log.info(line); + return; + } + TASK_OUTPUT_LOGGER.info(line); + } + + private String formatSqlResultValue(com.fasterxml.jackson.databind.JsonNode valueNode) { + if (valueNode == null || valueNode.isNull()) { + return "NULL"; + } + if (valueNode.getNodeType() == JsonNodeType.STRING) { + return valueNode.asText(); + } + if (valueNode.isValueNode()) { + return valueNode.asText(); + } + return JSONUtils.toJsonString(valueNode); + } + /** * send alert as an attachment * diff --git a/dolphinscheduler-ui/src/components/log-modal/index.tsx b/dolphinscheduler-ui/src/components/log-modal/index.tsx index acc3cdbdb892..36e77deade61 100644 --- a/dolphinscheduler-ui/src/components/log-modal/index.tsx +++ b/dolphinscheduler-ui/src/components/log-modal/index.tsx @@ -56,6 +56,10 @@ const props = { showDownloadLog: { type: Boolean as PropType, default: false + }, + title: { + type: String as PropType, + default: '' } } @@ -126,7 +130,7 @@ export default defineComponent({ return ( { + variables.showOutputModalRef = false + } + const getLogs = (row: any) => { const { state } = useAsyncState( queryLog({ @@ -132,6 +136,27 @@ const BatchTaskInstance = defineComponent({ return state } + const getOutputs = (row: any) => { + const { state } = useAsyncState( + queryTaskOutput({ + taskInstanceId: Number(row.id), + limit: variables.limit, + skipLineNum: variables.skipLineNum + }).then((res: any) => { + variables.outputRef += res.message || '' + if (res && res.message !== '') { + variables.skipLineNum += res.lineNum + getOutputs(row) + } else { + variables.outputLoadingRef = false + } + }), + {} + ) + + return state + } + const refreshLogs = (row: any) => { variables.logRef = '' variables.limit = 1000 @@ -139,6 +164,13 @@ const BatchTaskInstance = defineComponent({ getLogs(row) } + const refreshOutputs = (row: any) => { + variables.outputRef = '' + variables.limit = 1000 + variables.skipLineNum = 0 + getOutputs(row) + } + const trim = getCurrentInstance()?.appContext.config.globalProperties.trim onMounted(() => { @@ -156,7 +188,6 @@ const BatchTaskInstance = defineComponent({ if (variables.showModalRef) { getLogs(variables.row) } else { - variables.row = {} variables.logRef = '' variables.logLoadingRef = true variables.skipLineNum = 0 @@ -165,6 +196,20 @@ const BatchTaskInstance = defineComponent({ } ) + watch( + () => variables.showOutputModalRef, + () => { + if (variables.showOutputModalRef) { + getOutputs(variables.row) + } else { + variables.outputRef = '' + variables.outputLoadingRef = true + variables.skipLineNum = 0 + variables.limit = 1000 + } + } + ) + return { t, ...toRefs(variables), @@ -179,7 +224,9 @@ const BatchTaskInstance = defineComponent({ onClearSearchStateType, onClearSearchTime, onConfirmModal, + onConfirmOutputModal, refreshLogs, + refreshOutputs, trim } }, @@ -190,8 +237,10 @@ const BatchTaskInstance = defineComponent({ onUpdatePageSize, onSearch, onConfirmModal, + onConfirmOutputModal, loadingRef, - refreshLogs + refreshLogs, + refreshOutputs } = this return ( @@ -295,6 +344,14 @@ const BatchTaskInstance = defineComponent({ onConfirmModal={onConfirmModal} onRefreshLogs={refreshLogs} /> + ) } diff --git a/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx b/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx index 5fd162e1377e..e0c5e8ebea2e 100644 --- a/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx +++ b/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx @@ -37,7 +37,7 @@ import { SearchOutlined } from '@vicons/antd' import { useTable } from './use-stream-table' import { useI18n } from 'vue-i18n' import { useAsyncState } from '@vueuse/core' -import { queryLog } from '@/service/modules/log' +import { queryLog, queryTaskOutput } from '@/service/modules/log' import { streamStateType } from '@/common/common' import Card from '@/components/card' import LogModal from '@/components/log-modal' @@ -93,6 +93,10 @@ const BatchTaskInstance = defineComponent({ variables.showModalRef = false } + const onConfirmOutputModal = () => { + variables.showOutputModalRef = false + } + const getLogs = (row: any) => { const { state } = useAsyncState( queryLog({ @@ -114,6 +118,27 @@ const BatchTaskInstance = defineComponent({ return state } + const getOutputs = (row: any) => { + const { state } = useAsyncState( + queryTaskOutput({ + taskInstanceId: Number(row.id), + limit: variables.limit, + skipLineNum: variables.skipLineNum + }).then((res: any) => { + variables.outputRef += res.message || '' + if (res && res.message !== '') { + variables.skipLineNum += res.lineNum + getOutputs(row) + } else { + variables.outputLoadingRef = false + } + }), + {} + ) + + return state + } + const refreshLogs = (row: any) => { variables.logRef = '' variables.limit = 1000 @@ -121,6 +146,13 @@ const BatchTaskInstance = defineComponent({ getLogs(row) } + const refreshOutputs = (row: any) => { + variables.outputRef = '' + variables.limit = 1000 + variables.skipLineNum = 0 + getOutputs(row) + } + const trim = getCurrentInstance()?.appContext.config.globalProperties.trim onMounted(() => { @@ -145,7 +177,6 @@ const BatchTaskInstance = defineComponent({ if (variables.showModalRef) { getLogs(variables.row) } else { - variables.row = {} variables.logRef = '' variables.logLoadingRef = true variables.skipLineNum = 0 @@ -154,6 +185,20 @@ const BatchTaskInstance = defineComponent({ } ) + watch( + () => variables.showOutputModalRef, + () => { + if (variables.showOutputModalRef) { + getOutputs(variables.row) + } else { + variables.outputRef = '' + variables.outputLoadingRef = true + variables.skipLineNum = 0 + variables.limit = 1000 + } + } + ) + return { t, ...toRefs(variables), @@ -167,7 +212,9 @@ const BatchTaskInstance = defineComponent({ onClearSearchStateType, onClearSearchTime, onConfirmModal, + onConfirmOutputModal, refreshLogs, + refreshOutputs, trim } }, @@ -178,8 +225,10 @@ const BatchTaskInstance = defineComponent({ onUpdatePageSize, onSearch, onConfirmModal, + onConfirmOutputModal, loadingRef, - refreshLogs + refreshLogs, + refreshOutputs } = this return ( @@ -274,6 +323,14 @@ const BatchTaskInstance = defineComponent({ onConfirmModal={onConfirmModal} onRefreshLogs={refreshLogs} /> + ) } diff --git a/dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts b/dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts index 1aff5c4d6712..711a0b38569f 100644 --- a/dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts +++ b/dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts @@ -19,6 +19,7 @@ import { useI18n } from 'vue-i18n' import { h, reactive, ref } from 'vue' import { downloadLog, + downloadOutput, queryTaskListPaging, savePoint, streamTaskStop @@ -27,6 +28,8 @@ import { NButton, NIcon, NSpace, NTooltip, NSpin } from 'naive-ui' import { AlignLeftOutlined, DownloadOutlined, + EyeOutlined, + FileSearchOutlined, RetweetOutlined, SaveOutlined, StopOutlined @@ -63,10 +66,13 @@ export function useTable() { workflowDefinitionName: null, totalPage: 1, showModalRef: false, + showOutputModalRef: false, row: {}, loadingRef: false, logRef: '', + outputRef: '', logLoadingRef: true, + outputLoadingRef: true, skipLineNum: 0, limit: 1000 }) @@ -144,7 +150,7 @@ export function useTable() { { title: t('project.task.operation'), key: 'operation', - ...COLUMN_WIDTH_CONFIG['operation'](5), + ...COLUMN_WIDTH_CONFIG['operation'](7), render(row: any) { return h(NSpace, null, { default: () => [ @@ -220,6 +226,30 @@ export function useTable() { default: () => t('project.task.view_log') } ), + h( + NTooltip, + {}, + { + trigger: () => + h( + NButton, + { + circle: true, + type: 'info', + size: 'small', + disabled: !row.host, + onClick: () => handleOutput(row) + }, + { + icon: () => + h(NIcon, null, { + default: () => h(EyeOutlined) + }) + } + ), + default: () => t('project.task.view_output') + } + ), h( NTooltip, {}, @@ -241,6 +271,28 @@ export function useTable() { default: () => t('project.task.download_log') } ), + h( + NTooltip, + {}, + { + trigger: () => + h( + NButton, + { + circle: true, + type: 'info', + size: 'small', + disabled: !row.host, + onClick: () => downloadOutput(row.id) + }, + { + icon: () => + h(NIcon, null, { default: () => h(FileSearchOutlined) }) + } + ), + default: () => t('project.task.download_output') + } + ), h( NTooltip, {}, @@ -280,6 +332,11 @@ export function useTable() { variables.row = row } + const handleOutput = (row: any) => { + variables.showOutputModalRef = true + variables.row = row + } + const getTableData = () => { if (variables.loadingRef) return variables.loadingRef = true diff --git a/dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts b/dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts index ba42abc16ff0..f3da8e77e9e4 100644 --- a/dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts +++ b/dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts @@ -21,14 +21,18 @@ import { useAsyncState } from '@vueuse/core' import { queryTaskListPaging, forceSuccess, - downloadLog + downloadLog, + downloadOutput } from '@/service/modules/task-instances' +import { queryTaskOutput } from '@/service/modules/log' import { NButton, NIcon, NSpace, NTooltip, NSpin, NEllipsis } from 'naive-ui' import ButtonLink from '@/components/button-link' import { AlignLeftOutlined, CheckCircleOutlined, - DownloadOutlined + DownloadOutlined, + EyeOutlined, + FileSearchOutlined } from '@vicons/antd' import { format } from 'date-fns' import { useRoute, useRouter } from 'vue-router' @@ -67,10 +71,13 @@ export function useTable() { workflowInstanceName: ref(null), totalPage: ref(1), showModalRef: ref(false), + showOutputModalRef: ref(false), row: {}, loadingRef: ref(false), logRef: '', + outputRef: '', logLoadingRef: ref(true), + outputLoadingRef: ref(true), skipLineNum: ref(0), limit: ref(1000) }) @@ -192,7 +199,7 @@ export function useTable() { { title: t('project.task.operation'), key: 'operation', - ...COLUMN_WIDTH_CONFIG['operation'](3), + ...COLUMN_WIDTH_CONFIG['operation'](5), render(row: any) { return h(NSpace, null, { default: () => [ @@ -251,6 +258,30 @@ export function useTable() { default: () => t('project.task.view_log') } ), + h( + NTooltip, + {}, + { + trigger: () => + h( + NButton, + { + circle: true, + type: 'info', + size: 'small', + disabled: !row.host, + onClick: () => handleOutput(row) + }, + { + icon: () => + h(NIcon, null, { + default: () => h(EyeOutlined) + }) + } + ), + default: () => t('project.task.view_output') + } + ), h( NTooltip, {}, @@ -272,6 +303,28 @@ export function useTable() { ), default: () => t('project.task.download_log') } + ), + h( + NTooltip, + {}, + { + trigger: () => + h( + NButton, + { + circle: true, + type: 'info', + size: 'small', + disabled: !row.host, + onClick: () => downloadOutput(row.id) + }, + { + icon: () => + h(NIcon, null, { default: () => h(FileSearchOutlined) }) + } + ), + default: () => t('project.task.download_output') + } ) ] }) @@ -288,6 +341,11 @@ export function useTable() { variables.row = row } + const handleOutput = (row: any) => { + variables.showOutputModalRef = true + variables.row = row + } + const handleForcedSuccess = (row: any) => { forceSuccess({ id: row.id }, { projectCode }).then(() => { getTableData({ diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java index 7330bddcc270..1464d0d276ad 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java @@ -60,7 +60,8 @@ public ITaskExecutor createTaskExecutor(final TaskExecutionContext taskExecution } private void assemblyTaskLogPath(final TaskExecutionContext taskExecutionContext) { - taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); + taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext, "log")); + taskExecutionContext.setTaskOutputLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext, "out")); } } diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml index 762076c03cd6..f84a27359881 100644 --- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml @@ -33,7 +33,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -51,6 +53,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + ${log.base}/dolphinscheduler-worker.log @@ -77,4 +98,8 @@ + + + + diff --git a/dolphinscheduler-worker/src/test/resources/logback.xml b/dolphinscheduler-worker/src/test/resources/logback.xml index 916d79e35489..7ab6138cd408 100644 --- a/dolphinscheduler-worker/src/test/resources/logback.xml +++ b/dolphinscheduler-worker/src/test/resources/logback.xml @@ -32,7 +32,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -50,6 +52,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + ${log.base}/dolphinscheduler-worker.log @@ -72,4 +93,8 @@ + + + +