diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index d14e9cf2..fce412ee 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -31,6 +31,7 @@ import ( var ( compressToTmp = zipfile.CompressToTmp workflowZip = "workflow.zip" + manifestFilename = storage.ManifestFileName removeFile = os.Remove removeAll = os.RemoveAll osStat = os.Stat @@ -73,6 +74,8 @@ type runProps struct { path string packPath string workflowUrl string + manifestPath string + tempPath string inputsPath string input Input optionFileUrl string @@ -118,6 +121,12 @@ type workflowOutputProps struct { workflowRunLogOutputs map[string]interface{} } +type ManifestProps struct { + MainWorkflowURL string `json:"mainWorkflowURL"` + InputFileURLs []string `json:"inputFileURLs"` + EngineOptions string `json:"engineOptions"` +} + type Manager struct { Project storage.ProjectClient Config storage.ConfigClient @@ -218,7 +227,9 @@ func (m *Manager) isUploadRequired() bool { scheme := strings.ToLower(m.parsedSourceURL.Scheme) m.isLocal = scheme == "" || scheme == "file" log.Debug().Msgf("workflow location is local? '%t', upload is required? '%t'", m.isLocal, m.isLocal) - return m.isLocal + inputIncluded := m.inputsPath != "" + log.Debug().Msgf("does input file exist? '%t'", inputIncluded) + return m.isLocal || inputIncluded } func (m *Manager) setWorkflowPath() { @@ -227,6 +238,7 @@ func (m *Manager) setWorkflowPath() { } projectLocation := m.Project.GetLocation() workflowPath := m.parsedSourceURL.Path + log.Debug().Msgf("location ay: %s \n workflow %s", projectLocation, workflowPath) m.path = filepath.Join(projectLocation, workflowPath) log.Debug().Msgf("workflow path is '%s", m.path) } @@ -242,41 +254,21 @@ func (m *Manager) packWorkflowPath() { return } - var absoluteWorkflowPath string if fileInfo.IsDir() { - absoluteWorkflowPath, err = createTempDir("", "workflow_*") - log.Debug().Msgf("workflow path '%s' is a directory, packing contents ...", absoluteWorkflowPath) - if err != nil { - m.err = err - return - } - defer func() { - err = removeAll(absoluteWorkflowPath) - if err != nil { - log.Warn().Msgf("Failed to delete temporary folder '%s'", m.packPath) - } - }() - - log.Debug().Msgf("recursively copying content of '%s' to '%s'", m.path, absoluteWorkflowPath) - err = copyFileRecursivelyToLocation(absoluteWorkflowPath, m.path) - if err != nil { - log.Error().Err(err) - m.err = err - return - } - + log.Debug().Msgf("workflow path '%s' is a directory, packing contents ...", m.tempPath) + defer m.deleteTempDir() log.Debug().Msgf("updating file references and loading packed content to '%s/%s'", m.bucketName, m.baseWorkflowKey) - err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, absoluteWorkflowPath, m.bucketName, m.baseWorkflowKey) + err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, m.tempPath, m.bucketName, m.baseWorkflowKey) if err != nil { log.Error().Err(err) m.err = err return } } else { - absoluteWorkflowPath = m.path + m.tempPath = m.path } - m.packPath, m.err = compressToTmp(absoluteWorkflowPath) + m.packPath, m.err = compressToTmp(m.tempPath) } func (m *Manager) setOutputBucket() { @@ -339,6 +331,26 @@ func (m *Manager) readInput(inputUrl string) { m.input = input } +func (m *Manager) copyInputToTemp() { + if m.err != nil || m.inputsPath == "" { + return + } + log.Debug().Msgf("Copying input file to temp: %s", m.tempPath) + absInputsPath, err := filepath.Abs(m.inputsPath) + bytes, err := m.Storage.ReadAsBytes(m.inputsPath) + + if err != nil { + m.err = err + return + } + dest := filepath.Join(m.tempPath, filepath.Base(absInputsPath)) + err = m.Storage.WriteFromBytes(dest, bytes) + if err != nil { + m.err = err + return + } +} + func (m *Manager) parseInputToArguments() { if m.err != nil || m.input == nil { return @@ -348,6 +360,67 @@ func (m *Manager) parseInputToArguments() { m.arguments = []string{arguments} } +func (m *Manager) initializeTempDir() { + if m.err != nil { + return + } + var err error + m.tempPath, err = createTempDir("", "workflow_*") + log.Debug().Msgf("created temp directory at: '%s'", m.tempPath) + if err != nil { + m.err = err + return + } + log.Debug().Msgf("recursively copying content of '%s' to '%s'", m.path, m.tempPath) + err = copyFileRecursivelyToLocation(m.tempPath, m.path) + if err != nil { + log.Error().Err(err) + m.err = err + return + } +} + +func (m *Manager) deleteTempDir() { + if m.tempPath == "" { + return + } + err := removeAll(m.tempPath) + if err != nil { + log.Warn().Msgf("Failed to delete temporary folder '%s'", m.tempPath) + } +} + +// writeTempManifest writes the inputsFile included in the command line to the temporary MANIFEST.json located in temp directory +// This function is only called if there is a path included in the command line with the --inputsFile flag +func (m *Manager) writeTempManifest() { + if m.err != nil || m.inputsPath == "" { + return + } + m.manifestPath = filepath.Join(m.tempPath, manifestFilename) + log.Debug().Msgf("Reading temp manifest %s", m.manifestPath) + bytes, err := m.Storage.ReadAsBytes(m.manifestPath) + if err != nil { + m.err = err + return + } + var data ManifestProps + if err := json.Unmarshal(bytes, &data); err != nil { + m.err = err + return + } + data.InputFileURLs = append(data.InputFileURLs, filepath.Base(m.inputsPath)) + bytes, err = json.Marshal(data) + if err != nil { + m.err = err + return + } + err = m.Storage.WriteFromBytes(m.manifestPath, bytes) + if err != nil { + m.err = err + return + } +} + func (m *Manager) uploadInputsToS3() { if m.err != nil || m.input == nil { return diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run.go b/packages/cli/internal/pkg/cli/workflow/workflow_run.go index 206a4141..ff291762 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run.go @@ -11,17 +11,20 @@ func (m *Manager) RunWorkflow(contextName, workflowName, inputsFileUrl string, o m.validateContextIsDeployed(contextName) m.setOutputBucket() m.parseWorkflowLocation() + m.setWorkflowPath() + m.initializeTempDir() + m.readInput(inputsFileUrl) + m.uploadInputsToS3() + m.copyInputToTemp() + m.writeTempManifest() + m.parseInputToArguments() if m.isUploadRequired() { m.setBaseObjectKey(contextName, workflowName) - m.setWorkflowPath() m.packWorkflowPath() m.uploadWorkflowToS3() m.cleanUpWorkflow() } m.calculateFinalLocation() - m.readInput(inputsFileUrl) - m.uploadInputsToS3() - m.parseInputToArguments() m.readOptionFile(optionFileUrl) m.setContextStackInfo(contextName) m.setWesUrl() diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go index b6d0aa7b..6c78745c 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "os" "path/filepath" "testing" @@ -50,7 +51,11 @@ const ( testCompressedTmpPath = "/tmp/123/workflow_1343535" testArgsFileName = "args.txt" testArgumentsDir = "workflow/path/" + testTempArgPath = testTempDir + "/" + testArgsFileName testArgumentsPath = testArgumentsDir + testArgsFileName + testMANIFESTPath = testTempDir + "/MANIFEST.json" + testMANIFEST = `{"mainWorkflowURL": "haplotypecaller-gvcf-gatk4.wdl","inputFileURLs": ["haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json"],"engineOptions": "--no-cache"}` + testAppendedMANIFEST = "{\"mainWorkFlowURL\":\"haplotypecaller-gvcf-gatk4.wdl\",\"inputFileURLs\":[\"haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json\",\"" + testArgsFileName + "\"],\"engineOptions\":\"--no-cache\"}" testOptionFileName = "test.json" testOptionFilePath = "file://path/to/" + testOptionFileName testWesUrl = "https://TestWesUrl.com/prod" @@ -88,15 +93,32 @@ type WorkflowRunTestSuite struct { origCompressToTmp func(srcPath string) (string, error) origWriteToTmp func(namePattern, content string) (string, error) - testProjSpec spec.Project - wfInstance ddb.WorkflowInstance - testStackInfo cfn.StackInfo - workAbsDir string - inputsAbsDir string + testProjSpec spec.Project + wfInstance ddb.WorkflowInstance + testStackInfo cfn.StackInfo + workAbsDir string + inputsAbsDir string + testAppendedMANIFEST string manager *Manager } +func (s *WorkflowRunTestSuite) formatBytesToString(fileContents string) gomock.Matcher { + eq := gomock.GotFormatterAdapter( + gomock.GotFormatterFunc( + func(i interface{}) string { + return fmt.Sprintf("%s", i) + }), + gomock.WantFormatter( + gomock.StringerFunc(func() string { + return fmt.Sprintf("%s", fileContents) + }), + gomock.Eq([]byte(fileContents)), + ), + ) + return eq +} + func (s *WorkflowRunTestSuite) BeforeTest(_, _ string) { s.ctrl = gomock.NewController(s.T()) s.mockProjectClient = storagemocks.NewMockProjectClient(s.ctrl) @@ -198,6 +220,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithS3Args() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testTempArgPath, s.formatBytesToString(testInputS3)).Return(nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatBytesToString(testAppendedMANIFEST)).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputS3), &testInputS3Map) @@ -228,6 +254,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithLocalArgs() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testTempArgPath, s.formatBytesToString(testInputLocal)).Return(nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatBytesToString(testAppendedMANIFEST)).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputLocalToS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) @@ -275,6 +305,8 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_OptionsFile() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) + s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockStorageClient.EXPECT().ReadAsBytes(testOptionFilePath).Return([]byte(testOptionFileLocal), nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) @@ -292,6 +324,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_WithLocalArgs() { s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testTempArgPath, s.formatBytesToString(testInputLocal)).Return(nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatBytesToString(testAppendedMANIFEST)).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputLocalToS3).Return(testTmpAttachmentPath, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockProjectClient.EXPECT().GetLocation().AnyTimes().Return(testProjectFileDir) @@ -317,10 +353,13 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_NoArgs() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) + actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, "", "") if s.Assert().NoError(err) { s.Assert().Equal(testRun1Id, actualId) @@ -366,6 +405,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CompressionFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot compress file" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) @@ -422,17 +462,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadArgsFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot read input" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) - s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) + s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) - s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) - s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) - s.mockFileInfo.EXPECT().IsDir().Return(true) - s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) - s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte{}, errors.New(errorMessage)) - s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().Error(err) { @@ -449,15 +482,8 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_UploadInputFailed() { s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockProjectClient.EXPECT().GetLocation().AnyTimes().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) - s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) - s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) - s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) - s.mockFileInfo.EXPECT().IsDir().Return(true) - s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) - s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) - s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) s.mockInputClient.EXPECT().UpdateInputs(s.inputsAbsDir, testInputS3Map, testOutputBucket, testFilePathKey).Return(nil, errors.New(errorMessage)) @@ -474,6 +500,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CfnFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot call CFN" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) @@ -495,6 +522,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CfnMissingWesUrlFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "wes endpoint for workflow type 'TypeLanguage' is missing in engine stack 'TestStackId'" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(false) @@ -520,10 +548,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_WesFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot call WES" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) - s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(true) @@ -571,10 +599,8 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CreateTempDir() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot dir error" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) - s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) - s.mockFileInfo.EXPECT().IsDir().Return(true) + s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return("", errors.New(errorMessage)) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") @@ -618,6 +644,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_RemoveErrorStillWorks() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testTempArgPath, s.formatBytesToString(testInputS3)).Return(nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatBytesToString(testAppendedMANIFEST)).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputS3), &testInputS3Map) diff --git a/packages/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py b/packages/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py index 8a787430..a782a378 100644 --- a/packages/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py +++ b/packages/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py @@ -206,7 +206,6 @@ def run_workflow( # Check if the WES workflow service is healthy. This will throw an exception if not. self._check_if_wes_service_healthy_() - self.logger.debug(f"RUN_WORKFLOW :: wes service is healthy") if not workflow_params: workflow_params = {} @@ -236,7 +235,9 @@ def run_workflow( f"RUN_WORKFLOW :: retrieving '{workflow_url}' => {tmpdir}" ) try: - props = get_workflow_from_s3(workflow_url, tmpdir, workflow_type) + props = self.get_workflow_from_s3( + workflow_url, tmpdir, workflow_type + ) except RuntimeError as e: raise InvalidRequestError(e) self.logger.debug( @@ -260,15 +261,6 @@ def run_workflow( f"RUN_WORKFLOW :: retrieved workflow attachment : {file.filename}" ) - if workflow_params.get("workflowInputs"): - if file.filename == workflow_params.get("workflowInputs"): - # these are inputs supplied at the command line - # they take highest priority and should be last on the list - if not files.get("workflowInputFiles"): - files["workflowInputFiles"] = [] - - files["workflowInputFiles"] += [file] - # create indexed workflow input keys and files # it should be workflowInputs, workflowInputs_2, ... , workflowInputs_5 if files.get("workflowInputFiles"): @@ -464,219 +456,211 @@ def _server_path(self, *args): args = [str(arg) for arg in args] return "/".join([self.url_prefix] + args) + def get_workflow_from_s3(self, s3_uri: str, localpath: str, workflow_type: str): + """ + Retrieves a workflow from S3 -def get_workflow_from_s3(s3_uri: str, localpath: str, workflow_type: str): - """ - Retrieves a workflow from S3 - - :param s3_uri: The S3 URI to the workflow (e.g. s3://bucketname/path/to/workflow.zip) - :param localpath: The location on the local filesystem to download the workflow - :param workflow_type: Type of workflow to expect (e.g. wdl, cwl, etc) + :param s3_uri: The S3 URI to the workflow (e.g. s3://bucketname/path/to/workflow.zip) + :param localpath: The location on the local filesystem to download the workflow + :param workflow_type: Type of workflow to expect (e.g. wdl, cwl, etc) - :rtype: dict of `data` and `files` + :rtype: dict of `data` and `files` - If the object is a generic file the file is set as `workflowSource` + If the object is a generic file the file is set as `workflowSource` - If the object is a `workflow.zip` file containing a single file, that file is set as `workflowSource` + If the object is a `workflow.zip` file containing a single file, that file is set as `workflowSource` - If the object is a `workflow.zip` file containing multiple files with a MANIFEST.json the MANIFEST is expected to have - * a mainWorkflowURL property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource` - * optionally, if an inputFileURLs property exists that provides a list of relative file paths in the zip to input.json, it will be used to set `workflowInputs` - * optionally, if an optionFileURL property exists that provides a relative file path in the zip to an options.json file, it will be used to set `workflowOptions` + If the object is a `workflow.zip` file containing multiple files with a MANIFEST.json the MANIFEST is expected to have + * a mainWorkflowURL property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource` + * optionally, if an inputFileURLs property exists that provides a list of relative file paths in the zip to input.json, it will be used to set `workflowInputs` + * optionally, if an optionFileURL property exists that provides a relative file path in the zip to an options.json file, it will be used to set `workflowOptions` - If the object is a `workflow.zip` file containing multiple files without a MANIFEST.json - * a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource` - * optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found - * optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions` + If the object is a `workflow.zip` file containing multiple files without a MANIFEST.json + * a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource` + * optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found + * optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions` - If the object is a `workflow.zip` file containing multiple files, the `workflow.zip` file is set as `workflowDependencies` - """ - s3 = boto3.resource("s3") - - u = urlparse(s3_uri) - bucket = s3.Bucket(u.netloc) - key = u.path[1:] + If the object is a `workflow.zip` file containing multiple files, the `workflow.zip` file is set as `workflowDependencies` + """ + s3 = boto3.resource("s3") - data = dict() - files = dict() + u = urlparse(s3_uri) + bucket = s3.Bucket(u.netloc) + key = u.path[1:] - if not key: - raise RuntimeError("invalid or missing S3 object key") + data = dict() + files = dict() - try: - file = path.join(localpath, path.basename(key)) - bucket.download_file(key, file) - except botocore.exceptions.ClientError as e: - raise RuntimeError(f"invalid S3 object: {e}") + if not key: + raise RuntimeError("invalid or missing S3 object key") - if path.basename(file) == "workflow.zip": try: - props = parse_workflow_zip_file(file, workflow_type) - except Exception as e: - raise RuntimeError(f"{s3_uri} is not a valid workflow.zip file: {e}") - - if props.get("data"): - data.update(props.get("data")) - - if props.get("files"): - files.update(props.get("files")) - else: - files["workflowSource"] = open(file, "rb") - - return {"data": data, "files": files} - + file = path.join(localpath, path.basename(key)) + bucket.download_file(key, file) + except botocore.exceptions.ClientError as e: + raise RuntimeError(f"invalid S3 object: {e}") + + if path.basename(file) == "workflow.zip": + try: + props = self.parse_workflow_zip_file(file, workflow_type) + except Exception as e: + raise RuntimeError(f"{s3_uri} is not a valid workflow.zip file: {e}") + + if props.get("data"): + data.update(props.get("data")) + + if props.get("files"): + files.update(props.get("files")) + else: + files["workflowSource"] = open(file, "rb") -def parse_workflow_zip_file(file, workflow_type): - """ - Processes a workflow zip bundle + return {"data": data, "files": files} - :param file: String or Path-like path to a workflow.zip file - :param workflow_type: String, type of workflow to expect (e.g. "wdl") + def parse_workflow_zip_file(self, file, workflow_type): + """ + Processes a workflow zip bundle - :rtype: dict of `data` and `files` + :param file: String or Path-like path to a workflow.zip file + :param workflow_type: String, type of workflow to expect (e.g. "wdl") - If the zip only contains a single file, that file is set as `workflowSource` + :rtype: dict of `data` and `files` - If the zip contains multiple files with a MANIFEST.json file, the MANIFEST is used to determine - appropriate `data` and `file` arguments. (See: parse_workflow_manifest_file()) + If the zip only contains a single file, that file is set as `workflowSource` - If the zip contains multiple files without a MANIFEST.json file: - * a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource` - * optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found - * optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions` + If the zip contains multiple files with a MANIFEST.json file, the MANIFEST is used to determine + appropriate `data` and `file` arguments. (See: parse_workflow_manifest_file()) - If the zip contains multiple files, the original zip is set as `workflowDependencies` - """ - data = dict() - files = dict() + If the zip contains multiple files without a MANIFEST.json file: + * a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource` + * optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found + * optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions` - wd = path.dirname(file) - with zipfile.ZipFile(file) as zip: - zip.extractall(wd) + If the zip contains multiple files, the original zip is set as `workflowDependencies` + """ + data = dict() + files = dict() - contents = zip.namelist() - if not contents: - raise RuntimeError("empty workflow.zip") + wd = path.dirname(file) + with zipfile.ZipFile(file) as zip: + zip.extractall(wd) - if len(contents) == 1: - # single file workflow - files["workflowSource"] = open(path.join(wd, contents[0]), "rb") + contents = zip.namelist() + if not contents: + raise RuntimeError("empty workflow.zip") - else: - # multifile workflow - if "MANIFEST.json" in contents: - props = parse_workflow_manifest_file(path.join(wd, "MANIFEST.json")) - - if props.get("data"): - data.update(props.get("data")) - - if props.get("files"): - files.update(props.get("files")) + if len(contents) == 1: + # single file workflow + files["workflowSource"] = open(path.join(wd, contents[0]), "rb") else: - if not f"main.{workflow_type.lower()}" in contents: - raise RuntimeError(f"'main.{workflow_type}' file not found") - - files["workflowSource"] = open( - path.join(wd, f"main.{workflow_type.lower()}"), "rb" - ) - - input_files = [f for f in contents if f.startswith("inputs")] - if input_files: - if not files.get("workflowInputFiles"): - files["workflowInputFiles"] = [] + # multifile workflow + if "MANIFEST.json" in contents: + props = self.parse_workflow_manifest_file( + path.join(wd, "MANIFEST.json") + ) - for input_file in input_files: - files[f"workflowInputFiles"] += [ - open(path.join(wd, input_file), "rb") - ] + if props.get("data"): + data.update(props.get("data")) - if "options.json" in contents: - files["workflowOptions"] = open(path.join(wd, "options.json"), "rb") + if props.get("files"): + files.update(props.get("files")) - # add the original zip bundle as a workflow dependencies file - files["workflowDependencies"] = open(file, "rb") + else: + if not f"main.{workflow_type.lower()}" in contents: + raise RuntimeError(f"'main.{workflow_type}' file not found") - return {"data": data, "files": files} + files["workflowSource"] = open( + path.join(wd, f"main.{workflow_type.lower()}"), "rb" + ) + if "options.json" in contents: + files["workflowOptions"] = open( + path.join(wd, "options.json"), "rb" + ) -def parse_workflow_manifest_file(manifest_file): - """ - Reads a MANIFEST.json file for a workflow zip bundle + # add the original zip bundle as a workflow dependencies file + files["workflowDependencies"] = open(file, "rb") - :param manifest_file: String or Path-like path to a MANIFEST.json file + return {"data": data, "files": files} - :rtype: dict of `data` and `files` + def parse_workflow_manifest_file(self, manifest_file): + """ + Reads a MANIFEST.json file for a workflow zip bundle + + :param manifest_file: String or Path-like path to a MANIFEST.json file + + :rtype: dict of `data` and `files` + + MANIFEST.json is expected to be formatted like: + .. code-block:: json + { + "mainWorkflowURL": "relpath/to/workflow", + "inputFileURLs": [ + "relpath/to/input-file-1", + "relpath/to/input-file-2", + ... + ], + "optionsFileURL" "relpath/to/option-file + } - MANIFEST.json is expected to be formatted like: - .. code-block:: json - { - "mainWorkflowURL": "relpath/to/workflow", - "inputFileURLs": [ - "relpath/to/input-file-1", - "relpath/to/input-file-2", - ... - ], - "optionsFileURL" "relpath/to/option-file - } + The `mainWorkflowURL` property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource` - The `mainWorkflowURL` property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource` + The inputFileURLs property is optional and provides a list of relative file paths in the zip to input.json files. The list is assumed + to be in the order the inputs should be applied - e.g. higher list index is higher priority. If present, it will be used to set + `workflowInputs(_\d)` arguments. - The inputFileURLs property is optional and provides a list of relative file paths in the zip to input.json files. The list is assumed - to be in the order the inputs should be applied - e.g. higher list index is higher priority. If present, it will be used to set - `workflowInputs(_\d)` arguments. + The optionsFileURL property is optional and provides a relative file path in the zip to an options.json file. If present, it will be + used to set `workflowOptions`. - The optionsFileURL property is optional and provides a relative file path in the zip to an options.json file. If present, it will be - used to set `workflowOptions`. + """ + data = dict() + files = dict() + with open(manifest_file, "rt") as f: + manifest = json.loads(f.read()) + u = urlparse(manifest["mainWorkflowURL"]) - """ - data = dict() - files = dict() - with open(manifest_file, "rt") as f: - manifest = json.loads(f.read()) - - u = urlparse(manifest["mainWorkflowURL"]) - if not u.scheme or u.scheme == "file": - # expect "/path/to/file" or "file:///path/to/file" - # root is relative to the zip root - files["workflowSource"] = open( - workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb" - ) + if not u.scheme or u.scheme == "file": + # expect "/path/to/file" or "file:///path/to/file" + # root is relative to the zip root + files["workflowSource"] = open( + workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb" + ) - else: - data["workflowUrl"] = manifest["mainWorkflowUrl"] + else: + data["workflowUrl"] = manifest["mainWorkflowUrl"] - if manifest.get("inputFileURLs"): - if not files.get("workflowInputFiles"): + if manifest.get("inputFileURLs"): files["workflowInputFiles"] = [] - for url in manifest["inputFileURLs"]: - u = urlparse(url) - if not u.scheme or u.scheme == "file": - files[f"workflowInputFiles"] += [ - open( - workflow_manifest_url_to_path(u, path.dirname(manifest_file)), - "rb", + for url in manifest["inputFileURLs"]: + u = urlparse(url) + if not u.scheme or u.scheme == "file": + files[f"workflowInputFiles"] += [ + open( + workflow_manifest_url_to_path( + u, path.dirname(manifest_file) + ), + "rb", + ) + ] + + else: + raise InvalidRequestError( + f"unsupported input file url scheme for: '{url}'" ) - ] + if manifest.get("optionsFileURL"): + u = urlparse(manifest["optionsFileURL"]) + if not u.scheme or u.scheme == "file": + files["workflowOptions"] = open( + workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb" + ) else: raise InvalidRequestError( - f"unsupported input file url scheme for: '{url}'" + f"unsupported option file url scheme for: '{manifest['optionFileURL']}'" ) - if manifest.get("optionsFileURL"): - u = urlparse(manifest["optionsFileURL"]) - if not u.scheme or u.scheme == "file": - files["workflowOptions"] = open( - workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb" - ) - else: - raise InvalidRequestError( - f"unsupported option file url scheme for: '{manifest['optionFileURL']}'" - ) - - return {"data": data, "files": files} + return {"data": data, "files": files} def workflow_manifest_url_to_path(url, parent_dir=None):