Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/scriptworker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
# Report this status on max_timeout. `intermittent-task` will rerun the
# task automatically. `internal-error` or other will require manual
# intervention.
"task_max_timeout_status": STATUSES["intermittent-task"],
"task_max_timeout_status": STATUSES["failure"],
"invalid_reclaim_status": STATUSES["intermittent-task"],
"task_script": ("bash", "-c", "echo foo && sleep 19 && exit 1"),
# Logging settings
Expand Down
15 changes: 15 additions & 0 deletions src/scriptworker/data/scriptworker_task_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"title": "Scriptworker task minimal schema",
"type": "object",
"properties": {
"payload": {
"type": "object",
"properties": {
"maxRunTime": {
"type": "integer"
}
}
}
},
"required": ["payload"]
}
19 changes: 16 additions & 3 deletions src/scriptworker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from taskcluster.exceptions import TaskclusterFailure

import taskcluster
from scriptworker.client import validate_json_schema
from scriptworker.constants import get_reversed_statuses
from scriptworker.exceptions import ScriptWorkerTaskException, WorkerShutdownDuringTask
from scriptworker.github import (
Expand All @@ -31,7 +32,7 @@
)
from scriptworker.log import get_log_filehandle, pipe_to_log
from scriptworker.task_process import TaskProcess
from scriptworker.utils import get_parts_of_url_path, retry_async
from scriptworker.utils import get_parts_of_url_path, load_json_or_yaml, retry_async

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -114,6 +115,15 @@ def get_run_id(claim_task):
return claim_task["runId"]


# get_task_maxruntime {{{1
def get_task_maxruntime(task_def, max_timeout):
"""Given a task definition, return the lower of max_timeout and maxRunTime if set"""
max_run_time = task_def["payload"].get("maxRunTime")
if max_run_time is None:
return max_timeout
return min(max_timeout, max_run_time)


# get_action_callback_name {{{1
def get_action_callback_name(task):
"""Get the callback name of an action task.
Expand Down Expand Up @@ -632,8 +642,11 @@ def prepare_to_run_task(context, claim_task):
dict: the contents of `current_task_info.json`

"""
schema_path = os.path.join(os.path.dirname(__file__), "data", "scriptworker_task_schema.json")
schema = load_json_or_yaml(schema_path, is_path=True)
current_task_info = {}
context.claim_task = claim_task
validate_json_schema(context.task, schema)
current_task_info["taskId"] = context.task_id
current_task_info["runId"] = get_run_id(claim_task)
log.info("Going to run taskId {taskId} runId {runId}!".format(**current_task_info))
Expand Down Expand Up @@ -661,17 +674,17 @@ async def run_task(context, to_cancellable_process):
env["TASKCLUSTER_ROOT_URL"] = context.config["taskcluster_root_url"]
kwargs = {"stdout": PIPE, "stderr": PIPE, "stdin": None, "close_fds": True, "preexec_fn": lambda: os.setsid(), "env": env} # pragma: no branch

timeout = get_task_maxruntime(context.task, context.config["task_max_timeout"])
subprocess = await asyncio.create_subprocess_exec(*context.config["task_script"], **kwargs)
context.proc = await to_cancellable_process(TaskProcess(subprocess))
timeout = context.config["task_max_timeout"]

with get_log_filehandle(context) as log_filehandle:
stderr_future = asyncio.ensure_future(pipe_to_log(context.proc.process.stderr, filehandles=[log_filehandle]))
stdout_future = asyncio.ensure_future(pipe_to_log(context.proc.process.stdout, filehandles=[log_filehandle]))
try:
_, pending = await asyncio.wait([stderr_future, stdout_future], timeout=timeout)
if pending:
message = "Exceeded task_max_timeout of {} seconds".format(timeout)
message = "Exceeded max run time of {} seconds".format(timeout)
log.warning(message)
await context.proc.stop()
raise ScriptWorkerTaskException(message, exit_code=context.config["task_max_timeout_status"])
Expand Down
2 changes: 1 addition & 1 deletion tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _craft_context(rw_context):
rw_context.claim_task = {
"credentials": {"a": "b"},
"status": {"taskId": "taskId"},
"task": {"dependencies": ["dependency1", "dependency2"], "taskGroupId": "dependency0"},
"task": {"dependencies": ["dependency1", "dependency2"], "taskGroupId": "dependency0", "payload": {}},
"runId": 0,
}
return rw_context
Expand Down
2 changes: 1 addition & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def context(rw_context):
rw_context.claim_task = {
"credentials": {"a": "b"},
"status": {"taskId": "taskId"},
"task": {"dependencies": ["dependency1", "dependency2"], "taskGroupId": "dependency0"},
"task": {"dependencies": ["dependency1", "dependency2"], "taskGroupId": "dependency0", "payload": {}},
"runId": 0,
}
yield rw_context
Expand Down