1
1
from time import sleep
2
2
3
3
from kubemq .queues import *
4
+
5
+
4
6
def example_send_receive ():
5
7
client = Client (address = "localhost:50000" )
6
8
send_result = client .send_queues_message (
@@ -21,6 +23,7 @@ def example_send_receive():
21
23
message .ack ()
22
24
client .close ()
23
25
26
+
24
27
def example_send_receive_with_auto_ack ():
25
28
client = Client (address = "localhost:50000" )
26
29
send_result = client .send_queues_message (
@@ -32,10 +35,7 @@ def example_send_receive_with_auto_ack():
32
35
print (f"Queue Message Sent: { send_result } " )
33
36
34
37
auto_ack_result = client .receive_queues_messages (
35
- channel = "auto_ack" ,
36
- max_messages = 1 ,
37
- wait_timeout_in_seconds = 10 ,
38
- auto_ack = True
38
+ channel = "auto_ack" , max_messages = 1 , wait_timeout_in_seconds = 10 , auto_ack = True
39
39
)
40
40
for message in auto_ack_result .messages :
41
41
print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
@@ -44,44 +44,51 @@ def example_send_receive_with_auto_ack():
44
44
45
45
def example_send_receive_with_dlq ():
46
46
client = Client (address = "localhost:50000" )
47
- for i in range (10 ):
48
- send_result = client .send_queues_message (
49
- QueueMessage (
50
- channel = "before_dlq" ,
51
- body = f"Message { i + 1 } " .encode ('utf-8' ),
52
- metadata = "some-metadata" ,
53
- attempts_before_dead_letter_queue = 1 ,
54
- dead_letter_queue = "dlq"
55
- ))
56
- print (f"Queue Message Sent: { send_result } " )
57
-
58
- dlq_result = client .receive_queues_messages (
59
- channel = "before_dlq" ,
60
- max_messages = 10 ,
61
- wait_timeout_in_seconds = 10 ,
47
+ send_result = client .send_queues_message (
48
+ QueueMessage (
49
+ channel = "python_process_queue" ,
50
+ body = f"Message" .encode ("utf-8" ),
51
+ metadata = "some-metadata" ,
52
+ attempts_before_dead_letter_queue = 4 ,
53
+ dead_letter_queue = "dlq_python_process_queue" ,
54
+ )
62
55
)
63
- for message in dlq_result .messages :
64
- print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
65
- message .reject ()
56
+ print (f"Queue Message Sent: { send_result } " )
66
57
67
- dlq_result = client .receive_queues_messages (
68
- channel = "dlq" ,
69
- max_messages = 10 ,
70
- wait_timeout_in_seconds = 10 ,
58
+ for i in range (2 ):
59
+ receive_result = client .receive_queues_messages (
60
+ channel = "python_process_queue" ,
61
+ max_messages = 1 ,
62
+ wait_timeout_in_seconds = 2 ,
63
+ )
64
+ if len (receive_result .messages ) == 0 :
65
+ print ("No more messages" )
66
+ break
67
+ for message in receive_result .messages :
68
+ print (
69
+ f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} , Receive Count:{ message .receive_count } "
70
+ )
71
+ message .reject ()
72
+ receive_result = client .receive_queues_messages (
73
+ channel = "python_process_queue" ,
74
+ max_messages = 1 ,
75
+ wait_timeout_in_seconds = 2 ,
71
76
)
72
- for message in dlq_result .messages :
73
- print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
74
- message .ack ()
77
+ if len (receive_result .messages ) == 0 :
78
+ print ("No more messages" )
79
+ for message in receive_result .messages :
80
+ print (
81
+ f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} , Receive Count:{ message .receive_count } "
82
+ )
83
+ message .re_queue ("python_process_re_queue" )
84
+
75
85
client .close ()
76
86
87
+
77
88
def example_send_receive_with_delay ():
78
89
client = Client (address = "localhost:50000" )
79
90
send_result = client .send_queues_message (
80
- QueueMessage (
81
- channel = "delay" ,
82
- body = b"message with delay" ,
83
- delay_in_seconds = 5
84
- )
91
+ QueueMessage (channel = "delay" , body = b"message with delay" , delay_in_seconds = 5 )
85
92
)
86
93
print (f"Queue Message Sent: { send_result } " )
87
94
@@ -98,13 +105,14 @@ def example_send_receive_with_delay():
98
105
message .ack ()
99
106
client .close ()
100
107
108
+
101
109
def example_send_receive_with_expiration ():
102
110
client = Client (address = "localhost:50000" )
103
111
send_result = client .send_queues_message (
104
112
QueueMessage (
105
113
channel = "expiration" ,
106
114
body = b"message with expiration" ,
107
- expiration_in_seconds = 5
115
+ expiration_in_seconds = 5 ,
108
116
)
109
117
)
110
118
print (f"Queue Message Sent: { send_result } " )
@@ -117,6 +125,8 @@ def example_send_receive_with_expiration():
117
125
)
118
126
print (f"Received { len (expiration_result .messages )} messages" )
119
127
client .close ()
128
+
129
+
120
130
def example_with_message_ack ():
121
131
client = Client (address = "localhost:50000" )
122
132
send_result = client .send_queues_message (
@@ -131,7 +141,7 @@ def example_with_message_ack():
131
141
channel = "message_ack" ,
132
142
max_messages = 1 ,
133
143
wait_timeout_in_seconds = 10 ,
134
- auto_ack = False
144
+ auto_ack = False ,
135
145
)
136
146
for message in message_ack_result .messages :
137
147
print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
@@ -153,7 +163,7 @@ def example_with_message_reject():
153
163
channel = "message_reject" ,
154
164
max_messages = 1 ,
155
165
wait_timeout_in_seconds = 10 ,
156
- auto_ack = False
166
+ auto_ack = False ,
157
167
)
158
168
for message in message_reject_result .messages :
159
169
print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
@@ -162,13 +172,14 @@ def example_with_message_reject():
162
172
channel = "message_reject" ,
163
173
max_messages = 1 ,
164
174
wait_timeout_in_seconds = 10 ,
165
- auto_ack = False
175
+ auto_ack = False ,
166
176
)
167
177
for message in message_reject_result .messages :
168
178
print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
169
179
message .ack ()
170
180
client .close ()
171
181
182
+
172
183
def example_with_message_requeue ():
173
184
client = Client (address = "localhost:50000" )
174
185
send_result = client .send_queues_message (
@@ -183,7 +194,7 @@ def example_with_message_requeue():
183
194
channel = "message_requeue" ,
184
195
max_messages = 1 ,
185
196
wait_timeout_in_seconds = 10 ,
186
- auto_ack = False
197
+ auto_ack = False ,
187
198
)
188
199
for message in message_requeue_result .messages :
189
200
print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
@@ -192,18 +203,20 @@ def example_with_message_requeue():
192
203
channel = "requeue_channel" ,
193
204
max_messages = 1 ,
194
205
wait_timeout_in_seconds = 10 ,
195
- auto_ack = False
206
+ auto_ack = False ,
196
207
)
197
208
for message in message_requeue_result .messages :
198
209
print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
199
210
message .ack ()
211
+
212
+
200
213
def example_with_ack_all ():
201
214
client = Client (address = "localhost:50000" )
202
215
for i in range (10 ):
203
216
send_result = client .send_queues_message (
204
217
QueueMessage (
205
218
channel = "ack_all" ,
206
- body = f"Message { i + 1 } " .encode (' utf-8' ),
219
+ body = f"Message { i + 1 } " .encode (" utf-8" ),
207
220
metadata = "some-metadata" ,
208
221
)
209
222
)
@@ -225,7 +238,7 @@ def example_with_reject_all():
225
238
send_result = client .send_queues_message (
226
239
QueueMessage (
227
240
channel = "reject_all" ,
228
- body = f"Message { i + 1 } " .encode (' utf-8' ),
241
+ body = f"Message { i + 1 } " .encode (" utf-8" ),
229
242
)
230
243
)
231
244
print (f"Queue Message Sent: { send_result } " )
@@ -249,13 +262,14 @@ def example_with_reject_all():
249
262
reject_all_result .ack_all ()
250
263
client .close ()
251
264
265
+
252
266
def example_with_requeue_all ():
253
267
client = Client (address = "localhost:50000" )
254
268
for i in range (10 ):
255
269
send_result = client .send_queues_message (
256
270
QueueMessage (
257
271
channel = "requeue_all" ,
258
- body = f"Message { i + 1 } " .encode (' utf-8' ),
272
+ body = f"Message { i + 1 } " .encode (" utf-8" ),
259
273
)
260
274
)
261
275
print (f"Queue Message Sent: { send_result } " )
@@ -295,14 +309,15 @@ def example_with_visibility():
295
309
max_messages = 1 ,
296
310
wait_timeout_in_seconds = 10 ,
297
311
auto_ack = False ,
298
- visibility_seconds = 2
312
+ visibility_seconds = 2 ,
299
313
)
300
314
for message in result .messages :
301
315
print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
302
316
time .sleep (1 )
303
317
message .ack ()
304
318
client .close ()
305
319
320
+
306
321
def example_with_visibility_expired ():
307
322
client = Client (address = "localhost:50000" )
308
323
send_result = client .send_queues_message (
@@ -318,7 +333,7 @@ def example_with_visibility_expired():
318
333
max_messages = 1 ,
319
334
wait_timeout_in_seconds = 10 ,
320
335
auto_ack = False ,
321
- visibility_seconds = 2
336
+ visibility_seconds = 2 ,
322
337
)
323
338
for message in result .messages :
324
339
try :
@@ -329,6 +344,7 @@ def example_with_visibility_expired():
329
344
print (err )
330
345
client .close ()
331
346
347
+
332
348
def example_with_visibility_extension ():
333
349
client = Client (address = "localhost:50000" )
334
350
send_result = client .send_queues_message (
@@ -344,7 +360,7 @@ def example_with_visibility_extension():
344
360
max_messages = 1 ,
345
361
wait_timeout_in_seconds = 10 ,
346
362
auto_ack = False ,
347
- visibility_seconds = 2
363
+ visibility_seconds = 2 ,
348
364
)
349
365
for message in result .messages :
350
366
try :
@@ -357,6 +373,7 @@ def example_with_visibility_extension():
357
373
print (err )
358
374
client .close ()
359
375
376
+
360
377
def example_with_wait_pull ():
361
378
client = Client (address = "localhost:50000" )
362
379
send_result = client .send_queues_message (
@@ -384,12 +401,13 @@ def example_with_wait_pull():
384
401
print (f"Pull Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
385
402
client .close ()
386
403
404
+
387
405
if __name__ == "__main__" :
388
406
try :
389
407
390
- example_send_receive ()
408
+ # example_send_receive()
391
409
# example_send_receive_with_auto_ack()
392
- # example_send_receive_with_dlq()
410
+ example_send_receive_with_dlq ()
393
411
# example_send_receive_with_delay()
394
412
# example_send_receive_with_expiration()
395
413
# example_with_message_ack()
0 commit comments