From 277a3ca9d40064e6802086d58539cb6367d15702 Mon Sep 17 00:00:00 2001 From: jhorman Date: Mon, 15 Jul 2019 18:28:33 -0700 Subject: [PATCH 1/3] Make the long polling interval a command line parameter option. Also lowers the default to 10, 20 is just a long time to wait, and leads to shutdowns taking a long time. --- pyqs/__init__.py | 2 +- pyqs/main.py | 18 ++++++++++++++---- pyqs/worker.py | 12 ++++++++---- tests/test_manager_worker.py | 7 ++++--- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pyqs/__init__.py b/pyqs/__init__.py index 14878b3..de6399c 100644 --- a/pyqs/__init__.py +++ b/pyqs/__init__.py @@ -1,4 +1,4 @@ from .decorator import task # noqa __title__ = 'pyqs' -__version__ = '0.1.2' +__version__ = '0.1.3' diff --git a/pyqs/main.py b/pyqs/main.py index 33b55b5..c7311db 100644 --- a/pyqs/main.py +++ b/pyqs/main.py @@ -107,6 +107,15 @@ def main(): action="store", ) + parser.add_argument( + "--long-polling-interval", + dest="long_polling_interval", + type=int, + default=10, + help='How long to poll SQS for a new message.', + action="store", + ) + args = parser.parse_args() _main( @@ -118,7 +127,8 @@ def main(): secret_access_key=args.secret_access_key, interval=args.interval, batchsize=args.batchsize, - prefetch_multiplier=args.prefetch_multiplier + prefetch_multiplier=args.prefetch_multiplier, + long_polling_interval=args.long_polling_interval ) @@ -130,7 +140,7 @@ def _add_cwd_to_path(): def _main(queue_prefixes, concurrency=5, logging_level="WARN", region=None, access_key_id=None, secret_access_key=None, - interval=1, batchsize=10, prefetch_multiplier=2): + interval=1, batchsize=10, prefetch_multiplier=2, long_polling_interval=10): logging.basicConfig( format="[%(levelname)s]: %(message)s", level=getattr(logging, logging_level), @@ -138,8 +148,8 @@ def _main(queue_prefixes, concurrency=5, logging_level="WARN", logger.info("Starting PyQS version {}".format(__version__)) manager = ManagerWorker( queue_prefixes, concurrency, interval, batchsize, - prefetch_multiplier=prefetch_multiplier, region=region, - access_key_id=access_key_id, secret_access_key=secret_access_key, + prefetch_multiplier=prefetch_multiplier, long_polling_interval=long_polling_interval, + region=region, access_key_id=access_key_id, secret_access_key=secret_access_key, ) _add_cwd_to_path() manager.start() diff --git a/pyqs/worker.py b/pyqs/worker.py index 0f3c584..48ec130 100644 --- a/pyqs/worker.py +++ b/pyqs/worker.py @@ -21,7 +21,7 @@ from pyqs.utils import get_aws_region_name, decode_message MESSAGE_DOWNLOAD_BATCH_SIZE = 10 -LONG_POLLING_INTERVAL = 20 +DEFAULT_LONG_POLLING_INTERVAL = 10 logger = logging.getLogger("pyqs") @@ -59,7 +59,7 @@ def parent_is_alive(self): class ReadWorker(BaseWorker): - def __init__(self, queue_url, internal_queue, batchsize, + def __init__(self, queue_url, internal_queue, batchsize, long_polling_interval=DEFAULT_LONG_POLLING_INTERVAL, connection_args=None, *args, **kwargs): super(ReadWorker, self).__init__(*args, **kwargs) if connection_args is None: @@ -74,6 +74,7 @@ def __init__(self, queue_url, internal_queue, batchsize, self.internal_queue = internal_queue self.batchsize = batchsize + self.long_polling_interval = long_polling_interval def run(self): # Set the child process to not receive any keyboard interrupts @@ -91,7 +92,7 @@ def read_message(self): messages = self.conn.receive_message( QueueUrl=self.queue_url, MaxNumberOfMessages=self.batchsize, - WaitTimeSeconds=LONG_POLLING_INTERVAL, + WaitTimeSeconds=self.long_polling_interval, ).get('Messages', []) logger.debug( @@ -236,7 +237,7 @@ def process_message(self): class ManagerWorker(object): def __init__(self, queue_prefixes, worker_concurrency, interval, batchsize, - prefetch_multiplier=2, region=None, access_key_id=None, + prefetch_multiplier=2, long_polling_interval=DEFAULT_LONG_POLLING_INTERVAL, region=None, access_key_id=None, secret_access_key=None): self.connection_args = { "region": region, @@ -250,6 +251,7 @@ def __init__(self, queue_prefixes, worker_concurrency, interval, batchsize, self.batchsize = 1 self.interval = interval self.prefetch_multiplier = prefetch_multiplier + self.long_polling_interval = long_polling_interval self.load_queue_prefixes(queue_prefixes) self.queue_urls = self.get_queue_urls_from_queue_prefixes( self.queue_prefixes) @@ -272,6 +274,7 @@ def _initialize_reader_children(self): self.reader_children.append( ReadWorker( queue_url, self.internal_queue, self.batchsize, + long_polling_interval=self.long_polling_interval, connection_args=self.connection_args, parent_id=self._pid, ) @@ -370,6 +373,7 @@ def _replace_reader_children(self): self.reader_children.pop(index) worker = ReadWorker( queue_url, self.internal_queue, self.batchsize, + long_polling_interval=self.long_polling_interval, connection_args=self.connection_args, parent_id=self._pid, ) diff --git a/tests/test_manager_worker.py b/tests/test_manager_worker.py index deef387..8a0593b 100644 --- a/tests/test_manager_worker.py +++ b/tests/test_manager_worker.py @@ -100,6 +100,7 @@ def test_main_method(ManagerWorker): ManagerWorker.assert_called_once_with( ['email1', 'email2'], 2, 1, 10, prefetch_multiplier=2, + long_polling_interval=10, region=None, secret_access_key=None, access_key_id=None, ) ManagerWorker.return_value.start.assert_called_once_with() @@ -116,12 +117,13 @@ def test_real_main_method(ArgumentParser, _main): ArgumentParser.return_value.parse_args.return_value = Mock( concurrency=3, queues=["email1"], interval=1, batchsize=10, logging_level="WARN", region='us-east-1', prefetch_multiplier=2, + long_polling_interval=3, access_key_id=None, secret_access_key=None, ) main() _main.assert_called_once_with( - queue_prefixes=['email1'], concurrency=3, interval=1, batchsize=10, + queue_prefixes=['email1'], concurrency=3, interval=1, batchsize=10, long_polling_interval=3, logging_level="WARN", region='us-east-1', prefetch_multiplier=2, access_key_id=None, secret_access_key=None, ) @@ -288,7 +290,6 @@ def process_counts(): sys.exit.assert_called_once_with(0) -@patch("pyqs.worker.LONG_POLLING_INTERVAL", 3) @mock_sqs @mock_sqs_deprecated def test_master_shuts_down_busy_read_workers(): @@ -338,7 +339,7 @@ def sleep_and_kill(pid): # Setup Manager manager = ManagerWorker( queue_prefixes=["tester"], worker_concurrency=0, interval=0.0, - batchsize=1, + batchsize=1, long_polling_interval=3 ) manager.start() From 8bcb1533eed4babbb621a52ff936028ab9970fd2 Mon Sep 17 00:00:00 2001 From: jhorman Date: Mon, 15 Jul 2019 20:10:59 -0700 Subject: [PATCH 2/3] Fixing pep E501 --- tests/test_manager_worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_manager_worker.py b/tests/test_manager_worker.py index 8a0593b..1a18634 100644 --- a/tests/test_manager_worker.py +++ b/tests/test_manager_worker.py @@ -123,9 +123,9 @@ def test_real_main_method(ArgumentParser, _main): main() _main.assert_called_once_with( - queue_prefixes=['email1'], concurrency=3, interval=1, batchsize=10, long_polling_interval=3, - logging_level="WARN", region='us-east-1', prefetch_multiplier=2, - access_key_id=None, secret_access_key=None, + queue_prefixes=['email1'], concurrency=3, interval=1, batchsize=10, + long_polling_interval=3, logging_level="WARN", region='us-east-1', + prefetch_multiplier=2, access_key_id=None, secret_access_key=None, ) From cc5d5b9b267aa2e95c1b970fb1dc56303226598a Mon Sep 17 00:00:00 2001 From: jhorman Date: Thu, 18 Jul 2019 12:40:48 -0700 Subject: [PATCH 3/3] Don't bump version --- pyqs/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyqs/__init__.py b/pyqs/__init__.py index de6399c..14878b3 100644 --- a/pyqs/__init__.py +++ b/pyqs/__init__.py @@ -1,4 +1,4 @@ from .decorator import task # noqa __title__ = 'pyqs' -__version__ = '0.1.3' +__version__ = '0.1.2'