Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add autoscaler plugin to Hyperflow docker image, add possibility to specify port and host when runnng as server #63

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
!tests
!utils
!wflib
!hyperflow-autoscaler-plugin
9 changes: 7 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
FROM node:12-alpine

#ENV PATH $PATH:/node_modules/.bin
ENV NODE_PATH=/usr/local/lib/node_modules

COPY . /hyperflow
RUN npm install -g /hyperflow

RUN mkdir -p /tmp/kubectl && cd /tmp/kubectl && apk add curl && \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" && \
install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl

RUN npm install -g @hyperflow/standalone-autoscaler-plugin @hyperflow/autoscaler-plugin /hyperflow
8 changes: 4 additions & 4 deletions bin/hflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var doc = "\
Usage:\n\
hflow run <workflow_dir_or_file> [-s] [--submit=<hyperflow_server_url] [--persist] [--with-server] [--log-provenance] [--provenance-output=<provenance_file>] [-p <plugin_module_name> ...] [--var=<name=value> ...]\n\
hflow recover <persistence-log> [-p <plugin_module_name> ...] [--var=<name=value> ...]\n\
hflow start-server [-p <plugin_module_name> ...]\n\
hflow start-server [--host <hyperflow_server_host>] [--port <hyperflow_server_port>] [-p <plugin_module_name> ...]\n\
hflow send <wf_id> ( <signal_file> | -d <signal_data> ) [-p <plugin_module_name> ...]\n\
hflow -h | --help | --version";

Expand All @@ -33,13 +33,13 @@ if (opts.run) {
if (opts['--with-server']) {
hflowStartServer();
}
hflowRun(opts, function(err, engine, wfId, wfName) { });
hflowRun(opts, function(err, engine, wfId, wfName) { }, false);
} else if (opts.send) {
hflowSend(opts);
} else if (opts['start-server']) {
hflowStartServer();
hflowStartServer(opts);
} else if (opts.recover) {
hflowRun(opts, function(err, engine, wfId, wfName) { });
hflowRun(opts, function(err, engine, wfId, wfName) { }, false);
} /*else if (opts.submit) {
hflowSubmit(opts);
}*/
Expand Down
32 changes: 26 additions & 6 deletions common/wfRun.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ function handle_writes(entries, cb) {
** TODO: support external IP address and configurable port number
**
*/
function hflowStartServer() {
function hflowStartServer(opts) {
var server = require('../server/hyperflow-server.js')(rcl, wflib);
let hostname = '127.0.0.1', port = process.env.PORT;
let hostname = opts['<hyperflow_server_host>'] || '127.0.0.1';
let port = opts['<hyperflow_server_port>'] || process.env.PORT;
server.listen(port, hostname, () => {
console.log("HyperFlow server started at: http://%s:%d", server.address().address, server.address().port);
});
Expand All @@ -101,7 +102,7 @@ function hflowStartServer() {
** - wfId: unique workflow identifier
** - wfName: workflow name (from workflow.json)
*/
function hflowRun(opts, runCb) {
function hflowRun(opts, runCb, runAsServer) {
var dbId = 0,
plugins = [],
recoveryMode = false,
Expand Down Expand Up @@ -166,6 +167,22 @@ function hflowRun(opts, runCb) {
}
});

if (wfConfig.containerSpec) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering where these containerSpec files should be published? Perhaps in workflow repositories, e.g. montage2-workflow?

const spec = wfConfig.containerSpec;
wfConfig.containerSpec = new Map(Array.from(spec).map(entity => {
const jobName = entity.jobName;
const cpu = entity.cpu;
const memory = entity.memory;
const data = {
"cpu": cpu,
"memory": memory
};
return [jobName, data];
}));
} else {
wfConfig.containerSpec = new Map();
}

var runWf = function(wfId, wfName, wfJson, cb) {
var config = wfConfig;
config["emulate"] = "false";
Expand All @@ -184,9 +201,10 @@ function hflowRun(opts, runCb) {
// engine.eventServer.on('trace.*', function(exec, args) {
// console.log('Event captured: ' + exec + ' ' + args + ' job done');
// });
this.plugins = [...plugins]

await Promise.all(
plugins.map(function(plugin) {
this.plugins.map(function(plugin) {
let config = {};
if (plugin.pgType == "scheduler") {
config.wfJson = wfJson;
Expand All @@ -196,8 +214,10 @@ function hflowRun(opts, runCb) {
}
));

engine.syncCb = function () {
process.exit();
if (!runAsServer) {
engine.syncCb = function () {
process.exit();
}
}

if (opts['--log-provenance']) {
Expand Down
11 changes: 10 additions & 1 deletion engine2/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
var fs = require('fs'),
fsm = require('./automata.js'),
async = require('async'),
eventServerFactory = require('../eventlog');
eventServerFactory = require('../eventlog'),
removeBufferManager = require('../functions/kubernetes/k8sCommand').removeBufferManager;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

engine2 is a core module, it would be best not to change it unless absolutely necessary. Creating dependency to a workflow function (which can easily be refactored out to a separate package) is not good.



var ProcDataflowFSM = require('./ProcDataflowFSM.js');
Expand All @@ -39,6 +40,7 @@ fsm.registerFSM(ProcSplitterFSM);
var Engine = function(config, wflib, wfId, cb) {
this.wflib = wflib;
this.config = config;
this.config.wfId = wfId;
this.eventServer = eventServerFactory.createEventServer();
this.wfId = wfId;
this.tasks = []; // array of task FSMs
Expand All @@ -51,6 +53,7 @@ var Engine = function(config, wflib, wfId, cb) {
this.nTasksLeft = 0; // how many tasks left (not finished)?
this.nWfOutsLeft = 0; // how many workflow outputs are still to be produced?
this.syncCb = null; // callback invoked when wf instance finished execution (passed to runInstanceSync)
this.plugins = [];

this.logProvenance = false;

Expand Down Expand Up @@ -151,6 +154,12 @@ Engine.prototype.taskFinished = function(taskId) {

Engine.prototype.workflowFinished = function() {
console.log("Workflow ["+this.wfId+"] finished. Exec trace:", this.trace+"." );
removeBufferManager(this.wfId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new part should be done outside engine2. The Engine can use eventServer to publish an event which can be received in another module. Something like eventServer.emit("trace.workflow.finished") should be okay. See various examples of usage.

this.plugins.forEach((plugin) => {
if (plugin.markWorkflowFinished) {
plugin.markWorkflowFinished(this.wfId);
}
});
//onsole.log(this.syncCb);
if (this.syncCb) {
this.syncCb();
Expand Down
30 changes: 25 additions & 5 deletions functions/kubernetes/k8sCommand.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ var RestartCounter = require('./restart_counter.js').RestartCounter;
var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob;
var fs = require('fs');

let bufferManager = new BufferManager();
let bufferManagers = {};
let restartCounters = {}

let backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
let restartCounter = new RestartCounter(backoffLimit);

// Function k8sCommandGroup
//
Expand All @@ -19,14 +19,16 @@ let restartCounter = new RestartCounter(backoffLimit);
// * outs
// * context
// * cb
async function k8sCommandGroup(bufferItems) {
async function k8sCommandGroup(wfId, bufferItems) {

// No action needed when buffer is empty
if (bufferItems.length == 0) {
return;
}

let startTime = Date.now();
let startTime = Date.now()
const bufferManager = bufferManagers[wfId];
const restartCounter = restartCounters[wfId];
console.log("k8sCommandGroup started, time:", startTime);

// Function for rebuffering items
Expand Down Expand Up @@ -165,10 +167,27 @@ async function k8sCommandGroup(bufferItems) {
return;
}

bufferManager.setCallback((items) => k8sCommandGroup(items));
function removeBufferManager(wfId) {
if (bufferManagers[wfId] !== undefined) {
delete bufferManagers[wfId];
}
if (restartCounters[wfId] !== undefined) {
delete restartCounters[wfId];
}
}

async function k8sCommand(ins, outs, context, cb) {
/** Buffer Manager configuration. */
const wfId = context.appConfig.wfId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wfId is available as context.appId (a different name is used due to historical reasons :-)). So, no need to pass it via appConfig.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const wfId = context.appId !!! Already available!

let bufferManager = bufferManagers[wfId];
if (bufferManager === undefined) {
bufferManager = new BufferManager();
bufferManager.setCallback((items) => k8sCommandGroup(wfId, items));
bufferManagers[wfId] = bufferManager;
}
if (restartCounters[wfId] === undefined) {
restartCounters[wfId] = new RestartCounter(backoffLimit);
}
buffersConf = context.appConfig.jobAgglomerations;
let alreadyConfigured = bufferManager.isConfigured();
if (alreadyConfigured == false && buffersConf != undefined) {
Expand All @@ -191,4 +210,5 @@ async function k8sCommand(ins, outs, context, cb) {
return;
}

exports.removeBufferManager = removeBufferManager;
exports.k8sCommand = k8sCommand;
20 changes: 16 additions & 4 deletions functions/kubernetes/k8sJobSubmit.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,28 @@ function createK8sJobMessage(job, taskId, context) {
var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => {
let quotedTaskIds = taskIds.map(x => '"' + x + '"');
var command = 'hflow-job-execute ' + context.redis_url + ' -a -- ' + quotedTaskIds.join(' ');
var containerName = job.image || process.env.HF_VAR_WORKER_CONTAINER;
var containerName = context.appConfig.containerImage || job.image || process.env.HF_VAR_WORKER_CONTAINER;
var workingDirPath = context.workdir;
var volumePath = '/work_dir';
var jobName = Math.random().toString(36).substring(7) + '-' +
job.name.replace(/_/g, '-') + "-" + context.procId + '-' + context.firingId;

// remove chars not allowd in Pod names
jobName = jobName.replace(/[^0-9a-z-]/gi, '').toLowerCase();

var cpuRequest = job.cpuRequest || process.env.HF_VAR_CPU_REQUEST || "0.5";
var memRequest = job.memRequest || process.env.HF_VAR_MEM_REQUEST || "50Mi";
const containerSpec = context.appConfig.containerSpec;
let cpuSpec;
let memorySpec;
if (containerSpec[job.name]) {
cpuSpec = containerSpec[job.name].cpu;
memorySpec = containerSpec[job.name].memory;
} else if (containerSpec.default) {
cpuSpec = containerSpec.default.cpu;
memorySpec = containerSpec.default.memory;
}

var cpuRequest = cpuSpec || job.cpuRequest || process.env.HF_VAR_CPU_REQUEST || "0.5";
var memRequest = memorySpec || job.memRequest || process.env.HF_VAR_MEM_REQUEST || "50Mi";

// Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined
var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
Expand All @@ -80,7 +92,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
restartPolicy: restartPolicy, backoffLimit: backoffLimit,
experimentId: context.hfId + ":" + context.appId,
workflowName: context.wfname, taskName: job.name,
appId: context.appId
appId: context.appId, workingDirPath: workingDirPath
}

// Add/override custom parameters for the job
Expand Down
Loading