From 20835bef21c5076a3eaed36a6af4bfe67749656f Mon Sep 17 00:00:00 2001 From: Artur Maciag Date: Wed, 7 Nov 2018 12:10:35 +0000 Subject: [PATCH] Base Worker shutdown fix --- development.txt | 2 +- pyqs/worker.py | 1 + tests/test_manager_worker.py | 113 ++++++++++++++++++----------------- 3 files changed, 60 insertions(+), 56 deletions(-) diff --git a/development.txt b/development.txt index d0167fb..fca9db3 100644 --- a/development.txt +++ b/development.txt @@ -1,6 +1,6 @@ coverage==4.4.1 mock==1.0.1 -moto==1.3.4 +moto==1.3.7 nose==1.3.0 pre-commit==0.7.6 sure==1.2.2 diff --git a/pyqs/worker.py b/pyqs/worker.py index d33e539..a4a6a08 100644 --- a/pyqs/worker.py +++ b/pyqs/worker.py @@ -37,6 +37,7 @@ def __init__(self, *args, **kwargs): def shutdown(self): logger.info("Received shutdown signal, shutting down PID {}!".format(os.getpid())) self.should_exit.set() + self.join() def parent_is_alive(self): if os.getppid() == 1: diff --git a/tests/test_manager_worker.py b/tests/test_manager_worker.py index beefcb2..b4a870c 100644 --- a/tests/test_manager_worker.py +++ b/tests/test_manager_worker.py @@ -7,6 +7,7 @@ import boto3 from mock import patch, Mock, MagicMock from moto import mock_sqs, mock_sqs_deprecated +from nose.tools import timed from pyqs.main import main, _main from pyqs.worker import ManagerWorker @@ -145,36 +146,6 @@ def test_master_spawns_worker_processes(): manager.stop() -@mock_sqs -@mock_sqs_deprecated -def test_master_replaces_reader_processes(): - """ - Test managing process replaces reader children - """ - - # Setup SQS Queue - conn = boto3.client('sqs', region_name='us-east-1') - conn.create_queue(QueueName="tester") - - # Setup Manager - manager = ManagerWorker(queue_prefixes=["tester"], worker_concurrency=1, interval=1, batchsize=10) - manager.start() - - # Get Reader PID - pid = manager.reader_children[0].pid - - # Kill Reader and wait to replace - manager.reader_children[0].shutdown() - time.sleep(0.1) - manager.replace_workers() - - # Check Replacement - manager.reader_children[0].pid.shouldnt.equal(pid) - - # Cleanup - manager.stop() - - @mock_sqs @mock_sqs_deprecated def test_master_counts_processes(): @@ -210,31 +181,63 @@ def test_master_counts_processes(): @mock_sqs @mock_sqs_deprecated -def test_master_replaces_worker_processes(): - """ - Test managing process replaces worker processes - """ - # Setup SQS Queue - conn = boto3.client('sqs', region_name='us-east-1') - conn.create_queue(QueueName="tester") - - # Setup Manager - manager = ManagerWorker(queue_prefixes=["tester"], worker_concurrency=1, interval=1, batchsize=10) - manager.start() - - # Get Worker PID - pid = manager.worker_children[0].pid - - # Kill Worker and wait to replace - manager.worker_children[0].shutdown() - time.sleep(0.1) - manager.replace_workers() - - # Check Replacement - manager.worker_children[0].pid.shouldnt.equal(pid) - - # Cleanup - manager.stop() +class TestMasterWorker: + + def setup(self): + # For debugging test + import sys + logger = logging.getLogger("pyqs") + logger.setLevel(logging.DEBUG) + stdout_handler = logging.StreamHandler(sys.stdout) + logger.addHandler(stdout_handler) + + # Setup SQS Queue + self.conn = boto3.client('sqs', region_name='us-east-1') + self.conn.create_queue(QueueName="tester") + + # Setup Manager + self.manager = ManagerWorker( + queue_prefixes=["tester"], worker_concurrency=1, interval=1, + batchsize=10, + ) + self.manager.start() + + def teardown(self): + self.manager.stop() + + @timed(60) + def test_master_replaces_reader_processes(self): + """ + Test managing process replaces reader children + """ + # Get Reader PID + pid = self.manager.reader_children[0].pid + + # Kill Reader and wait to replace + self.manager.reader_children[0].shutdown() + time.sleep(0.1) + self.manager.reader_children[0].is_alive().should.equal(False) + self.manager.replace_workers() + + # Check Replacement + self.manager.reader_children[0].pid.shouldnt.equal(pid) + + @timed(60) + def test_master_replaces_worker_processes(self): + """ + Test managing process replaces worker processes + """ + # Get Worker PID + pid = self.manager.worker_children[0].pid + + # Kill Worker and wait to replace + self.manager.worker_children[0].shutdown() + time.sleep(0.1) + self.manager.worker_children[0].is_alive().should.equal(False) + self.manager.replace_workers() + + # Check Replacement + self.manager.worker_children[0].pid.shouldnt.equal(pid) @mock_sqs