Skip to content
This repository was archived by the owner on May 31, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 99 additions & 26 deletions packages/cli/internal/pkg/cli/workflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
var (
compressToTmp = zipfile.CompressToTmp
workflowZip = "workflow.zip"
manifestFilename = storage.ManifestFileName
removeFile = os.Remove
removeAll = os.RemoveAll
osStat = os.Stat
Expand Down Expand Up @@ -73,6 +74,8 @@ type runProps struct {
path string
packPath string
workflowUrl string
manifestPath string
tempPath string
inputsPath string
input Input
optionFileUrl string
Expand Down Expand Up @@ -118,6 +121,12 @@ type workflowOutputProps struct {
workflowRunLogOutputs map[string]interface{}
}

type ManifestProps struct {
MainWorkflowURL string `json:"mainWorkflowURL"`
InputFileURLs []string `json:"inputFileURLs"`
EngineOptions string `json:"engineOptions"`
}

type Manager struct {
Project storage.ProjectClient
Config storage.ConfigClient
Expand Down Expand Up @@ -218,7 +227,9 @@ func (m *Manager) isUploadRequired() bool {
scheme := strings.ToLower(m.parsedSourceURL.Scheme)
m.isLocal = scheme == "" || scheme == "file"
log.Debug().Msgf("workflow location is local? '%t', upload is required? '%t'", m.isLocal, m.isLocal)
return m.isLocal
inputIncluded := m.inputsPath != ""
log.Debug().Msgf("does input file exist? '%t'", inputIncluded)
return m.isLocal || inputIncluded
}

func (m *Manager) setWorkflowPath() {
Expand All @@ -227,6 +238,7 @@ func (m *Manager) setWorkflowPath() {
}
projectLocation := m.Project.GetLocation()
workflowPath := m.parsedSourceURL.Path
log.Debug().Msgf("location ay: %s \n workflow %s", projectLocation, workflowPath)
m.path = filepath.Join(projectLocation, workflowPath)
log.Debug().Msgf("workflow path is '%s", m.path)
}
Expand All @@ -242,41 +254,21 @@ func (m *Manager) packWorkflowPath() {
return
}

var absoluteWorkflowPath string
if fileInfo.IsDir() {
absoluteWorkflowPath, err = createTempDir("", "workflow_*")
log.Debug().Msgf("workflow path '%s' is a directory, packing contents ...", absoluteWorkflowPath)
if err != nil {
m.err = err
return
}
defer func() {
err = removeAll(absoluteWorkflowPath)
if err != nil {
log.Warn().Msgf("Failed to delete temporary folder '%s'", m.packPath)
}
}()

log.Debug().Msgf("recursively copying content of '%s' to '%s'", m.path, absoluteWorkflowPath)
err = copyFileRecursivelyToLocation(absoluteWorkflowPath, m.path)
if err != nil {
log.Error().Err(err)
m.err = err
return
}

log.Debug().Msgf("workflow path '%s' is a directory, packing contents ...", m.tempPath)
defer m.deleteTempDir()
log.Debug().Msgf("updating file references and loading packed content to '%s/%s'", m.bucketName, m.baseWorkflowKey)
err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, absoluteWorkflowPath, m.bucketName, m.baseWorkflowKey)
err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, m.tempPath, m.bucketName, m.baseWorkflowKey)
if err != nil {
log.Error().Err(err)
m.err = err
return
}
} else {
absoluteWorkflowPath = m.path
m.tempPath = m.path
}

m.packPath, m.err = compressToTmp(absoluteWorkflowPath)
m.packPath, m.err = compressToTmp(m.tempPath)
}

func (m *Manager) setOutputBucket() {
Expand Down Expand Up @@ -339,6 +331,26 @@ func (m *Manager) readInput(inputUrl string) {
m.input = input
}

func (m *Manager) copyInputToTemp() {
if m.err != nil || m.inputsPath == "" {
return
}
log.Debug().Msgf("Copying input file to temp: %s", m.tempPath)
absInputsPath, err := filepath.Abs(m.inputsPath)
bytes, err := m.Storage.ReadAsBytes(m.inputsPath)

if err != nil {
m.err = err
return
}
dest := filepath.Join(m.tempPath, filepath.Base(absInputsPath))
err = m.Storage.WriteFromBytes(dest, bytes)
if err != nil {
m.err = err
return
}
}

func (m *Manager) parseInputToArguments() {
if m.err != nil || m.input == nil {
return
Expand All @@ -348,6 +360,67 @@ func (m *Manager) parseInputToArguments() {
m.arguments = []string{arguments}
}

func (m *Manager) initializeTempDir() {
if m.err != nil {
return
}
var err error
m.tempPath, err = createTempDir("", "workflow_*")
log.Debug().Msgf("created temp directory at: '%s'", m.tempPath)
if err != nil {
m.err = err
return
}
log.Debug().Msgf("recursively copying content of '%s' to '%s'", m.path, m.tempPath)
err = copyFileRecursivelyToLocation(m.tempPath, m.path)
if err != nil {
log.Error().Err(err)
m.err = err
return
}
}

func (m *Manager) deleteTempDir() {
if m.tempPath == "" {
return
}
err := removeAll(m.tempPath)
if err != nil {
log.Warn().Msgf("Failed to delete temporary folder '%s'", m.tempPath)
}
}

// writeTempManifest writes the inputsFile included in the command line to the temporary MANIFEST.json located in temp directory
// This function is only called if there is a path included in the command line with the --inputsFile flag
func (m *Manager) writeTempManifest() {
if m.err != nil || m.inputsPath == "" {
return
}
m.manifestPath = filepath.Join(m.tempPath, manifestFilename)
log.Debug().Msgf("Reading temp manifest %s", m.manifestPath)
bytes, err := m.Storage.ReadAsBytes(m.manifestPath)
if err != nil {
m.err = err
return
}
var data ManifestProps
if err := json.Unmarshal(bytes, &data); err != nil {
m.err = err
return
}
data.InputFileURLs = append(data.InputFileURLs, filepath.Base(m.inputsPath))
bytes, err = json.Marshal(data)
if err != nil {
m.err = err
return
}
err = m.Storage.WriteFromBytes(m.manifestPath, bytes)
if err != nil {
m.err = err
return
}
}

func (m *Manager) uploadInputsToS3() {
if m.err != nil || m.input == nil {
return
Expand Down
11 changes: 7 additions & 4 deletions packages/cli/internal/pkg/cli/workflow/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@ func (m *Manager) RunWorkflow(contextName, workflowName, inputsFileUrl string, o
m.validateContextIsDeployed(contextName)
m.setOutputBucket()
m.parseWorkflowLocation()
m.setWorkflowPath()
m.initializeTempDir()
m.readInput(inputsFileUrl)
m.uploadInputsToS3()
m.copyInputToTemp()
m.writeTempManifest()
m.parseInputToArguments()
if m.isUploadRequired() {
m.setBaseObjectKey(contextName, workflowName)
m.setWorkflowPath()
m.packWorkflowPath()
m.uploadWorkflowToS3()
m.cleanUpWorkflow()
}
m.calculateFinalLocation()
m.readInput(inputsFileUrl)
m.uploadInputsToS3()
m.parseInputToArguments()
m.readOptionFile(optionFileUrl)
m.setContextStackInfo(contextName)
m.setWesUrl()
Expand Down
Loading