Skip to content

Terminate subprocesses in celery after timeout #142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: development
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 92 additions & 16 deletions backend/apps/ifc_validation/tasks.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,101 @@
from dataclasses import dataclass
import os
import sys
import datetime
import subprocess
import functools
import json
import time
import psutil
import ifcopenshell

from celery import shared_task, chain, chord, group
from celery.utils.log import get_task_logger
from celery.exceptions import SoftTimeLimitExceeded
from django.db import transaction

from core.utils import log_execution

from apps.ifc_validation_models.settings import TASK_TIMEOUT_LIMIT, MEDIA_ROOT
from apps.ifc_validation_models.settings import MEDIA_ROOT
from apps.ifc_validation_models.decorators import requires_django_user_context
from apps.ifc_validation_models.models import *

from .email_tasks import *

logger = get_task_logger(__name__)

def terminate_subprocesses():
"""Helper function to terminate all subprocesses of the current process."""
parent = psutil.Process()
children = parent.children(recursive=True)
logger.debug(f"found child pids: {' '.join(map(str, (c.pid for c in children)))}")

for child in children:
try:
child.terminate()
logger.debug(f"terminated {child.pid}")
except psutil.NoSuchProcess:
logger.debug(f"no such process {child.pid}")
pass

_, alive = psutil.wait_procs(children, timeout=10)
logger.debug(f"processes still alive: {' '.join(map(str, (c.pid for c in alive)))}")
for child in alive:
try:
child.kill()
logger.debug(f"killed {child.pid}")
except psutil.NoSuchProcess:
logger.debug(f"no such process {child.pid}")
pass


def kill_subprocesses_on_timeout(task_func):
"""Decorator to catch celery's soft time out limit and generically terminate all subprocesses"""
@functools.wraps(task_func)
def wrapper(*args, **kwargs):
try:
return task_func(*args, **kwargs)
except SoftTimeLimitExceeded:
terminate_subprocesses()
raise
return wrapper


@dataclass
class proc_output:
returncode : int
stdout : str
stderr : str


def run_subprocess_wait(*args, check=False, **kwargs):
"""
Equivalent of subprocess.run() but with a loop to keep in
user code and terminating the process when catching the
soft timeout exception in celery.
"""
process = subprocess.Popen(*args, **kwargs)

try:
while True:
retcode = process.poll()
if retcode is not None:
break
time.sleep(0.1)

except BaseException as e:
print("Interrupt received, terminating process...")
process.terminate()
process.wait()
print("Process terminated.")
raise e

if check and retcode != 0:
raise subprocess.CalledProcessError(retcode, args[0])

stdout, stderr = process.communicate()
return proc_output(retcode, stdout, stderr)


@functools.lru_cache(maxsize=1024)
def get_absolute_file_path(file_name):
Expand Down Expand Up @@ -242,6 +318,7 @@ def instance_completion_subtask(self, prev_result, id, file_name, *args, **kwarg
@shared_task(bind=True)
@log_execution
@requires_django_user_context
@kill_subprocesses_on_timeout
def syntax_validation_subtask(self, prev_result, id, file_name, *args, **kwargs):

# fetch request info
Expand All @@ -266,12 +343,11 @@ def syntax_validation_subtask(self, prev_result, id, file_name, *args, **kwargs)

# note: use run instead of Popen b/c PIPE output can be very big...
task.set_process_details(None, check_program) # run() has no pid...
proc = subprocess.run(
proc = run_subprocess_wait(
check_program,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=TASK_TIMEOUT_LIMIT
)

# parse output
Expand Down Expand Up @@ -358,7 +434,7 @@ def parse_info_subtask(self, prev_result, id, file_name, *args, **kwargs):
code = "import ifcopenshell; ifcopenshell.open('" + file_path + "')"
check_program = [sys.executable, '-c', code, file_path]
logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}')
subprocess.run(check_program, check=True)
run_subprocess_wait(check_program, check=True)

except subprocess.CalledProcessError as err:

Expand Down Expand Up @@ -516,6 +592,7 @@ def parse_info_subtask(self, prev_result, id, file_name, *args, **kwargs):
@shared_task(bind=True)
@log_execution
@requires_django_user_context
@kill_subprocesses_on_timeout
def prerequisites_subtask(self, prev_result, id, file_name, *args, **kwargs):

# fetch request info
Expand Down Expand Up @@ -543,12 +620,11 @@ def prerequisites_subtask(self, prev_result, id, file_name, *args, **kwargs):
# check Gherkin IP
try:
# note: use run instead of Popen b/c PIPE output can be very big...
proc = subprocess.run(
proc = run_subprocess_wait(
check_program,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=TASK_TIMEOUT_LIMIT,
env=os.environ.copy()
)
task.set_process_details(None, check_program) # run() has no pid...
Expand Down Expand Up @@ -600,6 +676,7 @@ def prerequisites_subtask(self, prev_result, id, file_name, *args, **kwargs):
@shared_task(bind=True)
@log_execution
@requires_django_user_context
@kill_subprocesses_on_timeout
def schema_validation_subtask(self, prev_result, id, file_name, *args, **kwargs):

# fetch request info
Expand Down Expand Up @@ -627,12 +704,11 @@ def schema_validation_subtask(self, prev_result, id, file_name, *args, **kwargs)
# check schema
try:
# note: use run instead of Popen b/c PIPE output can be very big...
proc = subprocess.run(
proc = run_subprocess_wait(
check_program,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=TASK_TIMEOUT_LIMIT
)
task.set_process_details(None, check_program) # run() has no pid...
except subprocess.TimeoutExpired as err:
Expand Down Expand Up @@ -735,6 +811,7 @@ def is_schema_error(line):
@shared_task(bind=True)
@log_execution
@requires_django_user_context
@kill_subprocesses_on_timeout
def bsdd_validation_subtask(self, prev_result, id, file_name, *args, **kwargs):

# fetch request info
Expand Down Expand Up @@ -762,12 +839,11 @@ def bsdd_validation_subtask(self, prev_result, id, file_name, *args, **kwargs):
# check bSDD
try:
# note: use run instead of Popen b/c PIPE output can be very big...
proc = subprocess.run(
proc = run_subprocess_wait(
check_program,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=TASK_TIMEOUT_LIMIT,
env=os.environ.copy()
)
task.set_process_details(None, check_program) # run() has no pid...
Expand Down Expand Up @@ -839,6 +915,7 @@ def bsdd_validation_subtask(self, prev_result, id, file_name, *args, **kwargs):
@shared_task(bind=True)
@log_execution
@requires_django_user_context
@kill_subprocesses_on_timeout
def normative_rules_ia_validation_subtask(self, prev_result, id, file_name, *args, **kwargs):

# fetch request info
Expand Down Expand Up @@ -866,12 +943,11 @@ def normative_rules_ia_validation_subtask(self, prev_result, id, file_name, *arg
# check Gherkin IA
try:
# note: use run instead of Popen b/c PIPE output can be very big...
proc = subprocess.run(
proc = run_subprocess_wait(
check_program,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=TASK_TIMEOUT_LIMIT,
env=os.environ.copy()
)
task.set_process_details(None, check_program) # run() has no pid...
Expand Down Expand Up @@ -917,6 +993,7 @@ def normative_rules_ia_validation_subtask(self, prev_result, id, file_name, *arg
@shared_task(bind=True)
@log_execution
@requires_django_user_context
@kill_subprocesses_on_timeout
def normative_rules_ip_validation_subtask(self, prev_result, id, file_name, *args, **kwargs):

# fetch request info
Expand Down Expand Up @@ -944,12 +1021,11 @@ def normative_rules_ip_validation_subtask(self, prev_result, id, file_name, *arg
# check Gherkin IP
try:
# note: use run instead of Popen b/c PIPE output can be very big...
proc = subprocess.run(
proc = run_subprocess_wait(
check_program,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=TASK_TIMEOUT_LIMIT
)
task.set_process_details(None, check_program) # run() has no pid...

Expand Down Expand Up @@ -993,6 +1069,7 @@ def normative_rules_ip_validation_subtask(self, prev_result, id, file_name, *arg
@shared_task(bind=True)
@log_execution
@requires_django_user_context
@kill_subprocesses_on_timeout
def industry_practices_subtask(self, prev_result, id, file_name, *args, **kwargs):

# fetch request info
Expand Down Expand Up @@ -1020,12 +1097,11 @@ def industry_practices_subtask(self, prev_result, id, file_name, *args, **kwargs
# check Gherkin IP
try:
# note: use run instead of Popen b/c PIPE output can be very big...
proc = subprocess.run(
proc = run_subprocess_wait(
check_program,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=TASK_TIMEOUT_LIMIT
)
task.set_process_details(None, check_program) # run() has no pid...

Expand Down