From 42f7318963f3ef774c2e8795e5f31ab09f5ab1d7 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 1 Sep 2016 09:38:27 -0400 Subject: [PATCH] Reuse nodejs subprocess for faster expressions (#178) * Reuse javascript subprocess to reduce script evaluation overhead. --- cwltool/cwlNodeEngine.js | 13 ++++++ cwltool/sandboxjs.py | 74 +++++++++++++++++++++++++------ node-expr-engine/cwlNodeEngine.js | 46 ------------------- setup.py | 3 +- 4 files changed, 76 insertions(+), 60 deletions(-) create mode 100755 cwltool/cwlNodeEngine.js delete mode 100755 node-expr-engine/cwlNodeEngine.js diff --git a/cwltool/cwlNodeEngine.js b/cwltool/cwlNodeEngine.js new file mode 100755 index 000000000..ca75f6130 --- /dev/null +++ b/cwltool/cwlNodeEngine.js @@ -0,0 +1,13 @@ +"use strict"; +process.stdin.setEncoding('utf8'); +var incoming = ""; +process.stdin.on('data', function(chunk) { + incoming += chunk; + var i = incoming.indexOf("\n"); + if (i > -1) { + var fn = JSON.parse(incoming.substr(0, i)); + incoming = incoming.substr(i+1); + process.stdout.write(JSON.stringify(require("vm").runInNewContext(fn, {})) + "\n"); + } +}); +process.stdin.on('end', process.exit); diff --git a/cwltool/sandboxjs.py b/cwltool/sandboxjs.py index 974084a3e..c65635ade 100644 --- a/cwltool/sandboxjs.py +++ b/cwltool/sandboxjs.py @@ -3,24 +3,36 @@ import threading import errno import logging -from typing import Any, Dict, List, Mapping, Text, TypeVar, Union +import select +import os +import cStringIO +from cStringIO import StringIO +from typing import Any, Dict, List, Mapping, Text, TypeVar, Union +from pkg_resources import resource_stream class JavascriptException(Exception): pass _logger = logging.getLogger("cwltool") -JSON = Union[Dict[Any,Any], List[Any], Text, int, long, float, bool, None] +JSON = Union[Dict[Text,Any], List[Any], Text, int, long, float, bool, None] + +localdata = threading.local() have_node_slim = False -def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -> JSON +def new_js_proc(): + # type: () -> subprocess.Popen + + res = resource_stream(__name__, 'cwlNodeEngine.js') + nodecode = res.read() + nodejs = None trynodes = ("nodejs", "node") for n in trynodes: try: - nodejs = subprocess.Popen([n], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + nodejs = subprocess.Popen([n, "--eval", nodecode], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) break except OSError as e: if e.errno == errno.ENOENT: @@ -39,7 +51,7 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) - nodejs = subprocess.Popen(["docker", "run", "--attach=STDIN", "--attach=STDOUT", "--attach=STDERR", "--sig-proxy=true", "--interactive", - "--rm", nodeimg], + "--rm", nodeimg, "node", "--eval", nodecode], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except OSError as e: if e.errno == errno.ENOENT: @@ -55,15 +67,24 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) - "expressions, but couldn't find it. Tried %s, docker run " "node:slim" % u", ".join(trynodes)) + return nodejs + + +def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -> JSON + + if not hasattr(localdata, "proc") or localdata.proc.poll() is not None: + localdata.proc = new_js_proc() + + nodejs = localdata.proc + fn = u"\"use strict\";\n%s\n(function()%s)()" % (jslib, js if isinstance(js, basestring) and len(js) > 1 and js[0] == '{' else ("{return (%s);}" % js)) - script = u"console.log(JSON.stringify(require(\"vm\").runInNewContext(%s, {})));\n" % json.dumps(fn) killed = [] def term(): try: - nodejs.kill() killed.append(True) + nodejs.kill() except OSError: pass @@ -73,17 +94,44 @@ def term(): tm = threading.Timer(timeout, term) tm.start() - stdoutdata, stderrdata = nodejs.communicate(script) + stdin_buf = StringIO(json.dumps(fn)+"\n") + stdout_buf = StringIO() + stderr_buf = StringIO() + + completed = [] # type: List[Union[cStringIO.InputType, cStringIO.OutputType]] + while len(completed) < 3: + rready, wready, _ = select.select([nodejs.stdout, nodejs.stderr], [nodejs.stdin], []) + if nodejs.stdin in wready: + b = stdin_buf.read(select.PIPE_BUF) + if b: + os.write(nodejs.stdin.fileno(), b) + elif stdin_buf not in completed: + completed.append(stdin_buf) + for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)): + if pipes[0] in rready: + b = os.read(pipes[0].fileno(), select.PIPE_BUF) + if b: + pipes[1].write(b) + elif pipes[1] not in completed: + completed.append(pipes[1]) + if stdout_buf.getvalue().endswith("\n"): + for buf in (stdout_buf, stderr_buf): + if buf not in completed: + completed.append(buf) tm.cancel() + stdin_buf.close() + stdoutdata = stdout_buf.getvalue() + stderrdata = stderr_buf.getvalue() + def fn_linenum(): # type: () -> Text return u"\n".join(u"%04i %s" % (i+1, b) for i, b in enumerate(fn.split("\n"))) - if killed: - raise JavascriptException(u"Long-running script killed after %s seconds.\nscript was:\n%s\n" % (timeout, fn_linenum())) - - if nodejs.returncode != 0: - raise JavascriptException(u"Returncode was: %s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (nodejs.returncode, fn_linenum(), stdoutdata, stderrdata)) + if nodejs.poll() not in (None, 0): + if killed: + raise JavascriptException(u"Long-running script killed after %s seconds.\nscript was:\n%s\n" % (timeout, fn_linenum())) + else: + raise JavascriptException(u"Returncode was: %s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (nodejs.returncode, fn_linenum(), stdoutdata, stderrdata)) else: try: return json.loads(stdoutdata) diff --git a/node-expr-engine/cwlNodeEngine.js b/node-expr-engine/cwlNodeEngine.js deleted file mode 100755 index 112958497..000000000 --- a/node-expr-engine/cwlNodeEngine.js +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env nodejs - -"use strict"; - -process.stdin.setEncoding('utf8'); - -var incoming = ""; - -process.stdin.on('readable', function() { - var chunk = process.stdin.read(); - if (chunk !== null) { - incoming += chunk; - } -}); - -process.stdin.on('end', function() { - var j = JSON.parse(incoming); - var exp = "" - - if (j.script[0] == "{") { - exp = "{return function()" + j.script + "();}"; - } - else { - exp = "{return " + j.script + ";}"; - } - - var fn = '"use strict";\n'; - - if (j.engineConfig) { - for (var index = 0; index < j.engineConfig.length; ++index) { - fn += j.engineConfig[index] + "\n"; - } - } - - fn += "var $job = " + JSON.stringify(j.job) + ";\n"; - fn += "var $self = " + JSON.stringify(j.context) + ";\n" - - fn += "var $runtime = " + JSON.stringify(j.runtime) + ";\n" - fn += "var $tmpdir = " + JSON.stringify(j.tmpdir) + ";\n" - fn += "var $outdir = " + JSON.stringify(j.outdir) + ";\n" - - - fn += "(function()" + exp + ")()"; - - process.stdout.write(JSON.stringify(require("vm").runInNewContext(fn, {}))); -}); diff --git a/setup.py b/setup.py index 8e9eb413a..5298c3678 100755 --- a/setup.py +++ b/setup.py @@ -34,7 +34,8 @@ 'schemas/v1.0/*.yml', 'schemas/v1.0/*.md', 'schemas/v1.0/salad/schema_salad/metaschema/*.yml', - 'schemas/v1.0/salad/schema_salad/metaschema/*.md']}, + 'schemas/v1.0/salad/schema_salad/metaschema/*.md', + 'cwlNodeEngine.js']}, install_requires=[ 'requests', 'ruamel.yaml == 0.12.4',