Skip to content

Commit 7934e3f

Browse files
committed
Refactor and add async example for queue interactions
Removed obsolete client-side gRPC test script and updated the queue examples to enable all example cases. Introduced a new async example for sending and receiving messages using asyncio and KubeMQ queues for improved functionality.
1 parent b6c4615 commit 7934e3f

File tree

3 files changed

+68
-67
lines changed

3 files changed

+68
-67
lines changed

examples/queues/async.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import asyncio
2+
import logging
3+
import os
4+
from logging.config import dictConfig
5+
6+
7+
8+
from kubemq.queues import *
9+
10+
async def example_send_receive():
11+
with Client(
12+
address="localhost:50000",
13+
) as client:
14+
async def _send():
15+
while True:
16+
send_result = await client.send_queues_message_async(
17+
QueueMessage(
18+
channel="send_receive",
19+
body=b"message",
20+
),
21+
)
22+
if send_result.is_error:
23+
print(f"Error sending message: {send_result.error}")
24+
await asyncio.sleep(1)
25+
continue
26+
print(f"Queue Message Sent: {send_result}")
27+
await asyncio.sleep(1)
28+
29+
async def _receive():
30+
while True:
31+
send_receive_result = await client.receive_queues_messages_async(
32+
channel="send_receive",
33+
max_messages=1,
34+
wait_timeout_in_seconds=1,
35+
)
36+
if send_receive_result.is_error:
37+
print(f"Error receiving message: {send_receive_result.error}")
38+
await asyncio.sleep(1)
39+
continue
40+
for message in send_receive_result.messages:
41+
print(f"Id:{message.id}, Body:{message.body.decode('utf-8')}")
42+
message.ack()
43+
44+
await asyncio.sleep(1)
45+
46+
task1 = asyncio.create_task(_send())
47+
task2 = asyncio.create_task(_receive())
48+
tasks = [task1, task2]
49+
await asyncio.gather(*tasks)
50+
51+
52+
if __name__ == "__main__":
53+
asyncio.run(example_send_receive())
54+

examples/queues/queues.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -405,20 +405,20 @@ def example_with_wait_pull():
405405
if __name__ == "__main__":
406406
try:
407407
example_send_receive()
408-
# example_send_receive_with_auto_ack()
409-
# example_send_receive_with_dlq()
410-
# example_send_receive_with_delay()
411-
# example_send_receive_with_expiration()
412-
# example_with_message_ack()
413-
# example_with_message_reject()
414-
# example_with_message_requeue()
415-
# example_with_ack_all()
416-
# example_with_reject_all()
417-
# example_with_requeue_all()
418-
# example_with_visibility()
419-
# example_with_visibility_expired()
420-
# example_with_visibility_extension()
421-
# example_with_wait_pull()
408+
example_send_receive_with_auto_ack()
409+
example_send_receive_with_dlq()
410+
example_send_receive_with_delay()
411+
example_send_receive_with_expiration()
412+
example_with_message_ack()
413+
example_with_message_reject()
414+
example_with_message_requeue()
415+
example_with_ack_all()
416+
example_with_reject_all()
417+
example_with_requeue_all()
418+
example_with_visibility()
419+
example_with_visibility_expired()
420+
example_with_visibility_extension()
421+
example_with_wait_pull()
422422
except Exception as err:
423423
print(err)
424424
finally:

test/client.py

Lines changed: 0 additions & 53 deletions
This file was deleted.

0 commit comments

Comments
 (0)