-
Notifications
You must be signed in to change notification settings - Fork 30
Workflow examples
The following workflow performs a streaming Map/Reduce analysis of tweets about Krakow. It demonstrates an important feature of the HyperFlow workflow model: operation as a continuous process network. Every process works according to the following cycle:
- Awaiting signals. Signals received by a proces are queued on its input ports.
- Invoking the function (firing). If a firing condition is fulfilled, i.e. the set of signals required for firing of the process has been received, these signals are consumed (removed from the queues) and the function of the process is invoked.
- Emitting output signals. Outputs returned by the function are emitted as signals. The process is ready for next firing.

The first process of the workflow (TweetSource) is a source process: it has no input signals, just regularly emits output signals (recent tweets about Krakow). To this end, the process is configured with attribute firingInterval: 15000 which tells the engine to fire the process every 15 seconds. The process is actually defined as dataflow:
{
"name": "TweetSource",
"type": "dataflow",
"function": "twitterSource",
"firingInterval": 15000,
"ins": [ ],
"outs": [ 0 ]
}Once fired, the function of the process, using the Twitter search API, retrieves all new tweets posted since the last firing. Next, the tweets are returned as an array of results which will be emitted by the engine as separate signals. In other words, the process can produce zero or more signals in each firing. Let's have a look how this part is implemented in the task's function:
...
twit.get('/search/tweets', options, function(err, data, response) {
// ... code omitted //
data.statuses.forEach(function(t) {
outs[0].data.push(t); // all elements of array 'data' will be emitted as separate signals
});
cb(null, outs);
}
...The Partitioner process is only responsible for routing tweets received from the TweetSource to one of four Mappers. Partitioner is defined as a choice process which copies its input signal to one of the outputs, selected on the basis of the CRC checksum of the tweet's text. Let's have a look at the function of the Partitioner:
function partitionTweets(ins, outs, executor, config, cb) {
var tweet = ins[0].data[0];
var n = (crc.crc32(tweet.text)>>>0) % (outs.length);
outs[n].condition = "true"; // this tweet will be forwarded to n-th output port (mapper)
outs[n].data = [tweet];
cb(null, outs);
}The Mappers parse tweets to retrieve keywords - URIs, user names, hash tags and words - and emit them as pairs (keyword, count) to the Reducers. Again, Mappers are defined as choice processes in order to partition the keywords among available Reducers. Similarly, the CRC checksum of the keyword is used to determine to which Reducer a given pair should be sent.
Finally, the Reducers aggregate the pairs received from Mappers (in this case just count the number of keyword occurrences). Though not currently implemented, the Reducers could update the aggregated statistics in a database.
The full workflow can be found in the [[workflows|https://github.com/dice-cyfronet/hyperflow/blob/continuous_tasks/workflows/Wf_MapReduce.json]] directory. Its functions are located under [[functions/mapreduce|https://github.com/dice-cyfronet/hyperflow/blob/continuous_tasks/functions/mapreduce/index.js]]
You can run this workflow with the following command:
node scripts/runwf.js -f workflows/Wf_MapReduce.json
In order to run it one needs Twitter API credentials configured in file twitter.conf.json:
{
"consumer_key": "...",
"consumer_secret": "...",
"access_token_key": "...",
"access_token_secret": "..."
}The next workflow performs a 'grep' on a text file: it reads the file line by line, matches each line against a regular expression, and emits the matching lines as results (as well as prints them). Features of the HyperFlow model demonstrated in this exmple are: control signals and sticky ports.

Control signals, contrary to data signals, are pure, i.e. they do not have 'value', just occur or not. In HyperFlow, every signal with attribute control: true is considered a control signal. Control signals differ from data signals as follows:
- they are not passed to the function of the process,
- control signals with certain names have special influence on the behavior of most processes.
Currently two names of control signals are distinguished: next and done; their semantics is as follows:
-
nextsignal:
- if present as input, it is required for the process to fire (even if all required data signals are ready);
- if present as output, it is emitted after every firing of the process.
-
donesignal:
-
- if present as input, it will cause the process to terminate right after the currently ongoing firing is finished;
- if present as output, it is emitted right before the process terminates.
A useful pattern for the next signal is to control the pace at which signals are produced so that overflow of signal queues can be avoided. In the 'Grep File' workflow, the signal next is connected from the output of match to the input of getLine. As a result, succesive lines of the file will be produced only after match is ready to process them.
Every input signal of a process can be declared sticky. Note that it is an attribute of a process input port, not the signal itself. Signals arriving on sticky ports are processed in a different way than other signals:
- When a signal is consumed from a sticky port, it is not removed from the input port (it can be consumed again immediately);
- When a signal arrives at a sticky port, it is not queued; instead, it replaces the current signal.
Sticky ports are useful for such inputs of a process that do not change during the process' lifetime (or change only occassionaly). This is the case with the regular expression in the 'Grep File' workflow, therefore the regexp signal is declared as sticky for the match process. Consequently, it conveniently can be sent only once to the match process.
You can run this workflow with the following command:
node scripts/runwf.js -f workflows/Wf_grepfile_simple.json
Note that the path to the input file is given as an initial instance of signal fileName:
{
"name": "fileName",
"data": [ { "value": "workflows/grepfiles.hwf" } ]
}This example shows a typical large-scale resource-intensive scientific workflow: Montage. It demonstrates how workflow tasks can be submitted to an external computing infrastructure, in this case a cloud. Montage produces very large mosaic images of the sky. It does that by executing a large pipeline of tasks (mainly image manipulation) whose number, depending on the size of the target image, may easily reach 10,000 as in the example below. In HyperFlow, each Montage task is modeled as a separate dataflow process which fires only once. All processes invoke the same function: amqp_command which actually submits a task to a message queue and waits on the same queue for the notification of task completion. The tasks are picked up by Montage workers running on virtual machines deployed on cloud infrastructure.
{
"name": "Montage_10k",
"functions": [ {
"name": "amqp_command",
"module": "functions"
} ],
"processes": [ {
"name": "mProjectPP",
"function": "amqp_command", // sends a task to the cloud via a message queue
"config": { // config object is also passed to the function
"executable": "mProjectPP",
"args": "-X -x 0.93860 2mass-atlas-980529s-j0150174.fits p2mass-atlas-980529s-j0150174.fits region_20090720_143653_22436.hdr"
},
"ins": [ 0, 3 ],
"outs": [ 1, 2 ]
},
// ... about 10k more processes ...
{
"name": "mJPEG", // final workflow task: generation of the final image
"function": "command",
"executor": "syscommand",
"config": {
"executable": "mJPEG",
"args": "-ct 1 -gray shrunken_20090720_143653_22436.fits -1.5s 60s gaussian -out shrunken_20090720_143653_22436.jpg"
},
"ins": [ 22960 ],
"outs": [ 22961 ]
} ],
"data": [ {
"name": "2mass-atlas-980529s-j0150174.fits" // signals represent files and contain only file names
},
// ... about 23k more signals
],
"ins": [ ... ],
"outs": [ ... ]
}In the workflows directory there are examples of Montage workflows which only print the commands the would be executed. You can run them as follows (note the necessary -s flag):
node scripts/runwf.js -f workflows/Montage_10k.json -s