diff --git a/backend/apps/ifc_validation/tasks.py b/backend/apps/ifc_validation/tasks.py index 393afb4..341fed0 100644 --- a/backend/apps/ifc_validation/tasks.py +++ b/backend/apps/ifc_validation/tasks.py @@ -1,18 +1,22 @@ +from dataclasses import dataclass import os import sys import datetime import subprocess import functools import json +import time +import psutil import ifcopenshell from celery import shared_task, chain, chord, group from celery.utils.log import get_task_logger +from celery.exceptions import SoftTimeLimitExceeded from django.db import transaction from core.utils import log_execution -from apps.ifc_validation_models.settings import TASK_TIMEOUT_LIMIT, MEDIA_ROOT +from apps.ifc_validation_models.settings import MEDIA_ROOT from apps.ifc_validation_models.decorators import requires_django_user_context from apps.ifc_validation_models.models import * @@ -20,6 +24,78 @@ logger = get_task_logger(__name__) +def terminate_subprocesses(): + """Helper function to terminate all subprocesses of the current process.""" + parent = psutil.Process() + children = parent.children(recursive=True) + logger.debug(f"found child pids: {' '.join(map(str, (c.pid for c in children)))}") + + for child in children: + try: + child.terminate() + logger.debug(f"terminated {child.pid}") + except psutil.NoSuchProcess: + logger.debug(f"no such process {child.pid}") + pass + + _, alive = psutil.wait_procs(children, timeout=10) + logger.debug(f"processes still alive: {' '.join(map(str, (c.pid for c in alive)))}") + for child in alive: + try: + child.kill() + logger.debug(f"killed {child.pid}") + except psutil.NoSuchProcess: + logger.debug(f"no such process {child.pid}") + pass + + +def kill_subprocesses_on_timeout(task_func): + """Decorator to catch celery's soft time out limit and generically terminate all subprocesses""" + @functools.wraps(task_func) + def wrapper(*args, **kwargs): + try: + return task_func(*args, **kwargs) + except SoftTimeLimitExceeded: + terminate_subprocesses() + raise + return wrapper + + +@dataclass +class proc_output: + returncode : int + stdout : str + stderr : str + + +def run_subprocess_wait(*args, check=False, **kwargs): + """ + Equivalent of subprocess.run() but with a loop to keep in + user code and terminating the process when catching the + soft timeout exception in celery. + """ + process = subprocess.Popen(*args, **kwargs) + + try: + while True: + retcode = process.poll() + if retcode is not None: + break + time.sleep(0.1) + + except BaseException as e: + print("Interrupt received, terminating process...") + process.terminate() + process.wait() + print("Process terminated.") + raise e + + if check and retcode != 0: + raise subprocess.CalledProcessError(retcode, args[0]) + + stdout, stderr = process.communicate() + return proc_output(retcode, stdout, stderr) + @functools.lru_cache(maxsize=1024) def get_absolute_file_path(file_name): @@ -242,6 +318,7 @@ def instance_completion_subtask(self, prev_result, id, file_name, *args, **kwarg @shared_task(bind=True) @log_execution @requires_django_user_context +@kill_subprocesses_on_timeout def syntax_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): # fetch request info @@ -266,12 +343,11 @@ def syntax_validation_subtask(self, prev_result, id, file_name, *args, **kwargs) # note: use run instead of Popen b/c PIPE output can be very big... task.set_process_details(None, check_program) # run() has no pid... - proc = subprocess.run( + proc = run_subprocess_wait( check_program, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT ) # parse output @@ -358,7 +434,7 @@ def parse_info_subtask(self, prev_result, id, file_name, *args, **kwargs): code = "import ifcopenshell; ifcopenshell.open('" + file_path + "')" check_program = [sys.executable, '-c', code, file_path] logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - subprocess.run(check_program, check=True) + run_subprocess_wait(check_program, check=True) except subprocess.CalledProcessError as err: @@ -516,6 +592,7 @@ def parse_info_subtask(self, prev_result, id, file_name, *args, **kwargs): @shared_task(bind=True) @log_execution @requires_django_user_context +@kill_subprocesses_on_timeout def prerequisites_subtask(self, prev_result, id, file_name, *args, **kwargs): # fetch request info @@ -543,12 +620,11 @@ def prerequisites_subtask(self, prev_result, id, file_name, *args, **kwargs): # check Gherkin IP try: # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( + proc = run_subprocess_wait( check_program, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT, env=os.environ.copy() ) task.set_process_details(None, check_program) # run() has no pid... @@ -600,6 +676,7 @@ def prerequisites_subtask(self, prev_result, id, file_name, *args, **kwargs): @shared_task(bind=True) @log_execution @requires_django_user_context +@kill_subprocesses_on_timeout def schema_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): # fetch request info @@ -627,12 +704,11 @@ def schema_validation_subtask(self, prev_result, id, file_name, *args, **kwargs) # check schema try: # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( + proc = run_subprocess_wait( check_program, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT ) task.set_process_details(None, check_program) # run() has no pid... except subprocess.TimeoutExpired as err: @@ -735,6 +811,7 @@ def is_schema_error(line): @shared_task(bind=True) @log_execution @requires_django_user_context +@kill_subprocesses_on_timeout def bsdd_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): # fetch request info @@ -762,12 +839,11 @@ def bsdd_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): # check bSDD try: # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( + proc = run_subprocess_wait( check_program, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT, env=os.environ.copy() ) task.set_process_details(None, check_program) # run() has no pid... @@ -839,6 +915,7 @@ def bsdd_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): @shared_task(bind=True) @log_execution @requires_django_user_context +@kill_subprocesses_on_timeout def normative_rules_ia_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): # fetch request info @@ -866,12 +943,11 @@ def normative_rules_ia_validation_subtask(self, prev_result, id, file_name, *arg # check Gherkin IA try: # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( + proc = run_subprocess_wait( check_program, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT, env=os.environ.copy() ) task.set_process_details(None, check_program) # run() has no pid... @@ -917,6 +993,7 @@ def normative_rules_ia_validation_subtask(self, prev_result, id, file_name, *arg @shared_task(bind=True) @log_execution @requires_django_user_context +@kill_subprocesses_on_timeout def normative_rules_ip_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): # fetch request info @@ -944,12 +1021,11 @@ def normative_rules_ip_validation_subtask(self, prev_result, id, file_name, *arg # check Gherkin IP try: # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( + proc = run_subprocess_wait( check_program, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT ) task.set_process_details(None, check_program) # run() has no pid... @@ -993,6 +1069,7 @@ def normative_rules_ip_validation_subtask(self, prev_result, id, file_name, *arg @shared_task(bind=True) @log_execution @requires_django_user_context +@kill_subprocesses_on_timeout def industry_practices_subtask(self, prev_result, id, file_name, *args, **kwargs): # fetch request info @@ -1020,12 +1097,11 @@ def industry_practices_subtask(self, prev_result, id, file_name, *args, **kwargs # check Gherkin IP try: # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( + proc = run_subprocess_wait( check_program, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT ) task.set_process_details(None, check_program) # run() has no pid...