Skip to content

Commit fb8cce3

Browse files
committed
Allow tasks to change the visibility timeout of the message they are handling.
When a task fails, immediately make it available again instead of waiting for visibility timeout. Fixes #47
1 parent df7edbd commit fb8cce3

File tree

4 files changed

+50
-3
lines changed

4 files changed

+50
-3
lines changed

pyqs/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from .decorator import task # noqa
22

33
__title__ = 'pyqs'
4-
__version__ = '0.1.2'
4+
__version__ = '0.1.4'

pyqs/decorator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ def wrapper(*args, **kwargs):
5454

5555

5656
class task(object):
57+
""" Decorator that enables sqs based task execution. If the function
58+
accepts an optional `_context` argument, an instance of TaskContext is
59+
passed to the task function. The context allows the function to do things
60+
like change message visibility. """
61+
5762
def __init__(self, queue=None, delay_seconds=None,
5863
custom_function_path=None):
5964
self.queue_name = queue

pyqs/utils.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pickle
44

55
import boto3
6+
from datetime import timedelta
67

78

89
def decode_message(message):
@@ -36,3 +37,20 @@ def get_aws_region_name():
3637
region_name = 'us-east-1'
3738

3839
return region_name
40+
41+
42+
class TaskContext(object):
43+
""" Tasks may optionally accept a _context variable. If they do, an
44+
instance of this object is passed as the context. """
45+
46+
def __init__(self, conn, queue_url, receipt_handle):
47+
self.conn = conn
48+
self.queue_url = queue_url
49+
self.receipt_handle = receipt_handle
50+
51+
def change_message_visibility(self, timeout=timedelta(minutes=10)):
52+
self.conn.change_message_visibility(
53+
QueueUrl=self.queue_url,
54+
ReceiptHandle=self.receipt_handle,
55+
VisibilityTimeout=int(timeout.total_seconds())
56+
)

pyqs/worker.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,20 @@
1111
import time
1212

1313
from multiprocessing import Event, Process, Queue
14+
1415
try:
1516
from queue import Empty, Full
1617
except ImportError:
1718
from Queue import Empty, Full
1819

20+
try:
21+
from inspect import getfullargspec as get_args
22+
except ImportError:
23+
from inspect import getargspec as get_args
24+
1925
import boto3
2026

21-
from pyqs.utils import get_aws_region_name, decode_message
27+
from pyqs.utils import get_aws_region_name, decode_message, TaskContext
2228

2329
MESSAGE_DOWNLOAD_BATCH_SIZE = 10
2430
LONG_POLLING_INTERVAL = 20
@@ -180,6 +186,7 @@ def process_message(self):
180186
full_task_path = message_body['task']
181187
args = message_body['args']
182188
kwargs = message_body['kwargs']
189+
receipt_handle = message['ReceiptHandle']
183190

184191
task_name = full_task_path.split(".")[-1]
185192
task_path = ".".join(full_task_path.split(".")[:-1])
@@ -188,6 +195,15 @@ def process_message(self):
188195

189196
task = getattr(task_module, task_name)
190197

198+
# if the task accepts the optional _context argument, pass it the TaskContext
199+
if '_context' in get_args(task).args:
200+
kwargs = dict(kwargs)
201+
kwargs['_context'] = TaskContext(
202+
conn=self.conn,
203+
queue_url=queue_url,
204+
receipt_handle=receipt_handle
205+
)
206+
191207
current_time = time.time()
192208
if int(current_time - fetch_time) >= timeout:
193209
logger.warning(
@@ -214,12 +230,20 @@ def process_message(self):
214230
traceback.format_exc(),
215231
)
216232
)
233+
234+
# since the task failed, mark it is available again quickly (10 seconds)
235+
self.conn.change_message_visibility(
236+
QueueUrl=queue_url,
237+
ReceiptHandle=receipt_handle,
238+
VisibilityTimeout=10
239+
)
240+
217241
return True
218242
else:
219243
end_time = time.clock()
220244
self.conn.delete_message(
221245
QueueUrl=queue_url,
222-
ReceiptHandle=message['ReceiptHandle']
246+
ReceiptHandle=receipt_handle
223247
)
224248
logger.info(
225249
"Processed task {} in {:.4f} seconds with args: {} "

0 commit comments

Comments
 (0)