Skip to content

Commit ac3e34b

Browse files
committed
Also pass the message id in the context. This can allow the server to detect if the message is being sent again, b/c the message had a visibility time out on a different worker.
1 parent fb8cce3 commit ac3e34b

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

pyqs/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ class TaskContext(object):
4343
""" Tasks may optionally accept a _context variable. If they do, an
4444
instance of this object is passed as the context. """
4545

46-
def __init__(self, conn, queue_url, receipt_handle):
46+
def __init__(self, conn, queue_url, message_id, receipt_handle):
4747
self.conn = conn
4848
self.queue_url = queue_url
49+
self.message_id = message_id
4950
self.receipt_handle = receipt_handle
5051

5152
def change_message_visibility(self, timeout=timedelta(minutes=10)):

pyqs/worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ def process_message(self):
186186
full_task_path = message_body['task']
187187
args = message_body['args']
188188
kwargs = message_body['kwargs']
189+
message_id = message['MessageId']
189190
receipt_handle = message['ReceiptHandle']
190191

191192
task_name = full_task_path.split(".")[-1]
@@ -201,6 +202,7 @@ def process_message(self):
201202
kwargs['_context'] = TaskContext(
202203
conn=self.conn,
203204
queue_url=queue_url,
205+
message_id=message_id,
204206
receipt_handle=receipt_handle
205207
)
206208

0 commit comments

Comments
 (0)