-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathexecuteWorkflow.R
100 lines (90 loc) · 3.49 KB
/
executeWorkflow.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#' @name executeWorkflow
#' @aliases executeWorkflow
#' @title executeWorkflow
#' @description \code{executeWorkflow} allows to execute a workflow
#'
#' @usage executeWorkflow(file, dir, queue,
#' on_initWorkflowJob, on_initWorkflow, on_closeWorkflow,
#' monitor, session)
#'
#' @param file a JSON geoflow configuration file
#' @param dir a directory where to execute the workflow
#' @param queue an \pkg{ipc} queue to use geoflow in \pkg{geoflow-shiny}
#' @param on_initWorkflowJob a function to trigger once \code{initWorkflowJob} is executed
#' @param on_initWorkflow a function to trigger once \code{initWorkflow} is executed
#' @param on_closeWorkflow a function to trigger once \code{closeWorkflow} is executed
#' @param monitor a monitor function to increase progress bar
#' @param session a \pkg{shiny} session object (optional) to run geoflow in a \pkg{shiny} context
#' @return the path of the job directory
#'
#' @author Emmanuel Blondel, \email{emmanuel.blondel1@@gmail.com}
#' @export
#'
executeWorkflow <- function(file, dir = ".",
queue = NULL,
on_initWorkflowJob = NULL,
on_initWorkflow = NULL,
on_closeWorkflow = NULL,
monitor = NULL,
session = NULL){
#options
.defaultOptions <- options()
options(stringsAsFactors = FALSE)
options(gargle_oob_default = TRUE)
#optional shiny session object
if(!is.null(session)) if(!is(session, "ShinySession")){
stop("The 'session' argument should specify an object of class 'ShinySession'")
}
#1. Init the workflow based on configuration file
wd <- getwd()
config <- list()
jobDirPath <- initWorkflowJob(dir = dir)
config$job <- jobDirPath
if(!is.null(on_initWorkflowJob)){
on_initWorkflowJob(config = config, queue = queue)
}
capture.output({
setwd(wd)
config <- try(initWorkflow(file = file, dir = dir, jobDirPath = jobDirPath, session = session))
if(is(config,"try-error")){
stop(sprintf("Workflow failed during initialization, check logs at: %s", file.path(jobDirPath, "job-logs.txt")))
}
setwd(jobDirPath)
config$debug <- FALSE
if(!is.null(on_initWorkflow)){
on_initWorkflow(config = config, queue = queue)
}
}, file = file.path(jobDirPath, "job-logs.txt"))
#manage monitor
if(!is.null(monitor)){
if(is.function(monitor)){
if(all(names(formals(monitor))==c("step","config","entity","action","queue"))){
config$logger.info("use monitor function to trace processing steps")
}else{
config$logger.info("Monitor function escaped, parameter(s) missing or in wrong order")
monitor=NULL
}
}else{
config$logger.info("Monitor escaped, monitor must be a function")
monitor = NULL
}
}
#2. Execute the workflow job
capture.output({
exec <- try(executeWorkflowJob(config, queue = queue, monitor = monitor))
if(is(exec,"try-error")){
setwd(wd)
closeWorkflow(config)
stop(sprintf("Workflow failed during execution, check logs at: %s", file.path(config$job, "job-logs.txt")))
}
#3. close workflow
closeWorkflow(config)
if(!is.null(on_closeWorkflow)){
on_closeWorkflow(config = config, queue = queue)
}
}, file = file.path(config$job, "job-logs.txt"), append = TRUE)
#reset options
options(.defaultOptions)
setwd(wd)
return(config$job)
}