From 079307734bef26ee89611860ad0c89cc92c102c0 Mon Sep 17 00:00:00 2001 From: Julien Cristau Date: Tue, 7 Oct 2025 13:56:23 +0200 Subject: [PATCH 1/2] task: validate claimed task definition against minimal schema --- src/scriptworker/data/scriptworker_task_schema.json | 10 ++++++++++ src/scriptworker/task.py | 6 +++++- tests/test_task.py | 2 +- 3 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 src/scriptworker/data/scriptworker_task_schema.json diff --git a/src/scriptworker/data/scriptworker_task_schema.json b/src/scriptworker/data/scriptworker_task_schema.json new file mode 100644 index 000000000..ec2f5b7a3 --- /dev/null +++ b/src/scriptworker/data/scriptworker_task_schema.json @@ -0,0 +1,10 @@ +{ + "title": "Scriptworker task minimal schema", + "type": "object", + "properties": { + "payload": { + "type": "object" + } + }, + "required": ["payload"] +} diff --git a/src/scriptworker/task.py b/src/scriptworker/task.py index 14074a027..e4803dea3 100644 --- a/src/scriptworker/task.py +++ b/src/scriptworker/task.py @@ -20,6 +20,7 @@ from taskcluster.exceptions import TaskclusterFailure import taskcluster +from scriptworker.client import validate_json_schema from scriptworker.constants import get_reversed_statuses from scriptworker.exceptions import ScriptWorkerTaskException, WorkerShutdownDuringTask from scriptworker.github import ( @@ -31,7 +32,7 @@ ) from scriptworker.log import get_log_filehandle, pipe_to_log from scriptworker.task_process import TaskProcess -from scriptworker.utils import get_parts_of_url_path, retry_async +from scriptworker.utils import get_parts_of_url_path, load_json_or_yaml, retry_async log = logging.getLogger(__name__) @@ -632,8 +633,11 @@ def prepare_to_run_task(context, claim_task): dict: the contents of `current_task_info.json` """ + schema_path = os.path.join(os.path.dirname(__file__), "data", "scriptworker_task_schema.json") + schema = load_json_or_yaml(schema_path, is_path=True) current_task_info = {} context.claim_task = claim_task + validate_json_schema(context.task, schema) current_task_info["taskId"] = context.task_id current_task_info["runId"] = get_run_id(claim_task) log.info("Going to run taskId {taskId} runId {runId}!".format(**current_task_info)) diff --git a/tests/test_task.py b/tests/test_task.py index 9fb66b041..40f5514fa 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -46,7 +46,7 @@ def _craft_context(rw_context): rw_context.claim_task = { "credentials": {"a": "b"}, "status": {"taskId": "taskId"}, - "task": {"dependencies": ["dependency1", "dependency2"], "taskGroupId": "dependency0"}, + "task": {"dependencies": ["dependency1", "dependency2"], "taskGroupId": "dependency0", "payload": {}}, "runId": 0, } return rw_context From 1774784db6fb0ec1f688201f5057c042ea44dc35 Mon Sep 17 00:00:00 2001 From: Julien Cristau Date: Tue, 30 Sep 2025 14:25:28 +0200 Subject: [PATCH 2/2] feat: respect maxRunTime specified in task payload Instead of a single task_max_timeout value for all tasks, let task payloads include a maxRunTime value, and kill the task after that time has passed. Going over the timeout now returns failure instead of intermittent-task, to avoid unwanted retries. This is backwards-incompatible, since maxRunTime would previously have been ignored. --- src/scriptworker/constants.py | 2 +- src/scriptworker/data/scriptworker_task_schema.json | 7 ++++++- src/scriptworker/task.py | 13 +++++++++++-- tests/test_worker.py | 2 +- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/scriptworker/constants.py b/src/scriptworker/constants.py index b265cc456..28a5fd2f6 100644 --- a/src/scriptworker/constants.py +++ b/src/scriptworker/constants.py @@ -53,7 +53,7 @@ # Report this status on max_timeout. `intermittent-task` will rerun the # task automatically. `internal-error` or other will require manual # intervention. - "task_max_timeout_status": STATUSES["intermittent-task"], + "task_max_timeout_status": STATUSES["failure"], "invalid_reclaim_status": STATUSES["intermittent-task"], "task_script": ("bash", "-c", "echo foo && sleep 19 && exit 1"), # Logging settings diff --git a/src/scriptworker/data/scriptworker_task_schema.json b/src/scriptworker/data/scriptworker_task_schema.json index ec2f5b7a3..0501e1129 100644 --- a/src/scriptworker/data/scriptworker_task_schema.json +++ b/src/scriptworker/data/scriptworker_task_schema.json @@ -3,7 +3,12 @@ "type": "object", "properties": { "payload": { - "type": "object" + "type": "object", + "properties": { + "maxRunTime": { + "type": "integer" + } + } } }, "required": ["payload"] diff --git a/src/scriptworker/task.py b/src/scriptworker/task.py index e4803dea3..48815e022 100644 --- a/src/scriptworker/task.py +++ b/src/scriptworker/task.py @@ -115,6 +115,15 @@ def get_run_id(claim_task): return claim_task["runId"] +# get_task_maxruntime {{{1 +def get_task_maxruntime(task_def, max_timeout): + """Given a task definition, return the lower of max_timeout and maxRunTime if set""" + max_run_time = task_def["payload"].get("maxRunTime") + if max_run_time is None: + return max_timeout + return min(max_timeout, max_run_time) + + # get_action_callback_name {{{1 def get_action_callback_name(task): """Get the callback name of an action task. @@ -665,9 +674,9 @@ async def run_task(context, to_cancellable_process): env["TASKCLUSTER_ROOT_URL"] = context.config["taskcluster_root_url"] kwargs = {"stdout": PIPE, "stderr": PIPE, "stdin": None, "close_fds": True, "preexec_fn": lambda: os.setsid(), "env": env} # pragma: no branch + timeout = get_task_maxruntime(context.task, context.config["task_max_timeout"]) subprocess = await asyncio.create_subprocess_exec(*context.config["task_script"], **kwargs) context.proc = await to_cancellable_process(TaskProcess(subprocess)) - timeout = context.config["task_max_timeout"] with get_log_filehandle(context) as log_filehandle: stderr_future = asyncio.ensure_future(pipe_to_log(context.proc.process.stderr, filehandles=[log_filehandle])) @@ -675,7 +684,7 @@ async def run_task(context, to_cancellable_process): try: _, pending = await asyncio.wait([stderr_future, stdout_future], timeout=timeout) if pending: - message = "Exceeded task_max_timeout of {} seconds".format(timeout) + message = "Exceeded max run time of {} seconds".format(timeout) log.warning(message) await context.proc.stop() raise ScriptWorkerTaskException(message, exit_code=context.config["task_max_timeout_status"]) diff --git a/tests/test_worker.py b/tests/test_worker.py index 88273d2d5..ba4b5b0ec 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -30,7 +30,7 @@ def context(rw_context): rw_context.claim_task = { "credentials": {"a": "b"}, "status": {"taskId": "taskId"}, - "task": {"dependencies": ["dependency1", "dependency2"], "taskGroupId": "dependency0"}, + "task": {"dependencies": ["dependency1", "dependency2"], "taskGroupId": "dependency0", "payload": {}}, "runId": 0, } yield rw_context