Skip to content

Commit c632b72

Browse files
Copilotgonzalocasas
andcommitted
Complete ZeroMQ transport with integration tests and examples
Co-authored-by: gonzalocasas <933277+gonzalocasas@users.noreply.github.com>
1 parent 2900dd6 commit c632b72

File tree

3 files changed

+176
-13
lines changed

3 files changed

+176
-13
lines changed

setup.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ def read(*names, **kwargs):
2121

2222
long_description = read("README.md")
2323
requirements = read("requirements.txt").split("\n")
24-
optional_requirements = {}
24+
optional_requirements = {
25+
"zeromq": ["pyzmq>=19.0"],
26+
}
2527

2628
setup(
2729
name="compas_eve",

src/compas_eve/zeromq/__init__.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,20 @@ class ZeroMQTransport(Transport, EventEmitterMixin):
3535
----------
3636
endpoint : str
3737
Endpoint for the pub/sub communication, e.g. ``tcp://localhost:5555`` or ``inproc://test``.
38-
Publishers will connect to this endpoint, subscribers will bind to it.
38+
bind_subscriber : bool, optional
39+
If True, the subscriber socket will bind to the endpoint and publisher will connect.
40+
If False, the publisher will bind to the endpoint and subscriber will connect.
41+
Defaults to True for most use cases.
3942
"""
4043

41-
def __init__(self, endpoint, *args, **kwargs):
44+
def __init__(self, endpoint, bind_subscriber=True, *args, **kwargs):
4245
if zmq is None:
4346
raise ImportError("pyzmq is required for ZeroMQ transport. Please install it with: pip install pyzmq")
4447

4548
super(ZeroMQTransport, self).__init__(*args, **kwargs)
4649

4750
self.endpoint = endpoint
51+
self.bind_subscriber = bind_subscriber
4852
self._is_connected = False
4953
self._local_callbacks = {}
5054

@@ -53,18 +57,15 @@ def __init__(self, endpoint, *args, **kwargs):
5357
self.pub_socket = self.context.socket(zmq.PUB)
5458
self.sub_socket = self.context.socket(zmq.SUB)
5559

56-
# Publisher connects to endpoint, subscriber binds to it
57-
# This allows multiple publishers to connect to one subscriber endpoint
58-
try:
60+
# Configure sockets based on bind_subscriber setting
61+
if self.bind_subscriber:
62+
# Subscriber binds, publisher connects - good for many publishers, few subscribers
5963
self.sub_socket.bind(self.endpoint)
6064
self.pub_socket.connect(self.endpoint)
61-
except zmq.ZMQError as e:
62-
# If bind fails, try the reverse (useful for tcp endpoints that might be in use)
63-
try:
64-
self.pub_socket.connect(self.endpoint)
65-
self.sub_socket.connect(self.endpoint)
66-
except zmq.ZMQError:
67-
raise e
65+
else:
66+
# Publisher binds, subscriber connects - good for one publisher, many subscribers
67+
self.pub_socket.bind(self.endpoint)
68+
self.sub_socket.connect(self.endpoint)
6869

6970
# Set up polling for subscriber
7071
self.poller = zmq.Poller()

tests/integration/test_zeromq.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
from threading import Event
2+
3+
from compas_eve import Message
4+
from compas_eve import Publisher
5+
from compas_eve import Subscriber
6+
from compas_eve import Topic
7+
from compas_eve import set_default_transport
8+
9+
try:
10+
from compas_eve import ZeroMQTransport
11+
ZEROMQ_AVAILABLE = True
12+
except ImportError:
13+
ZEROMQ_AVAILABLE = False
14+
15+
import pytest
16+
17+
18+
@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available")
19+
def test_zeromq_tcp_pubsub():
20+
"""Test ZeroMQ transport with TCP endpoints."""
21+
tx = ZeroMQTransport("tcp://localhost:25555")
22+
event = Event()
23+
topic = Topic("/messages_compas_eve_test/tcp_pubsub/", Message)
24+
25+
Subscriber(topic, lambda m: event.set(), transport=tx).subscribe()
26+
27+
# Small delay to ensure subscriber is ready
28+
import time
29+
time.sleep(0.1)
30+
31+
Publisher(topic, transport=tx).publish(Message(done=True))
32+
33+
received = event.wait(timeout=3)
34+
assert received, "Message not received"
35+
36+
tx.close()
37+
38+
39+
@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available")
40+
def test_zeromq_tcp_message_content():
41+
"""Test that message content is preserved correctly with TCP transport."""
42+
tx = ZeroMQTransport("tcp://localhost:25556")
43+
event = Event()
44+
topic = Topic("/messages_compas_eve_test/tcp_content/", Message)
45+
46+
result = {}
47+
48+
def callback(msg):
49+
result["message"] = msg
50+
event.set()
51+
52+
Subscriber(topic, callback, transport=tx).subscribe()
53+
54+
# Small delay to ensure subscriber is ready
55+
import time
56+
time.sleep(0.1)
57+
58+
test_message = Message(
59+
name="ZeroMQ Test",
60+
value=42,
61+
nested={"key": "value", "list": [1, 2, 3]}
62+
)
63+
Publisher(topic, transport=tx).publish(test_message)
64+
65+
received = event.wait(timeout=3)
66+
assert received, "Message not received"
67+
assert result["message"].name == "ZeroMQ Test"
68+
assert result["message"].value == 42
69+
assert result["message"].nested["key"] == "value"
70+
assert result["message"].nested["list"] == [1, 2, 3]
71+
72+
tx.close()
73+
74+
75+
@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available")
76+
def test_zeromq_tcp_multiple_topics():
77+
"""Test that multiple topics work correctly with TCP transport."""
78+
tx = ZeroMQTransport("tcp://localhost:25557")
79+
80+
topic1 = Topic("/test/topic1", Message)
81+
topic2 = Topic("/test/topic2", Message)
82+
83+
event1 = Event()
84+
event2 = Event()
85+
86+
result = {}
87+
88+
def callback1(msg):
89+
result["topic1"] = msg
90+
event1.set()
91+
92+
def callback2(msg):
93+
result["topic2"] = msg
94+
event2.set()
95+
96+
# Subscribe to both topics
97+
Subscriber(topic1, callback1, transport=tx).subscribe()
98+
Subscriber(topic2, callback2, transport=tx).subscribe()
99+
100+
# Small delay to ensure subscribers are ready
101+
import time
102+
time.sleep(0.1)
103+
104+
# Publish to topic1
105+
Publisher(topic1, transport=tx).publish(Message(source="topic1"))
106+
107+
# Publish to topic2
108+
Publisher(topic2, transport=tx).publish(Message(source="topic2"))
109+
110+
received1 = event1.wait(timeout=3)
111+
received2 = event2.wait(timeout=3)
112+
113+
assert received1, "Message 1 not received"
114+
assert received2, "Message 2 not received"
115+
assert result["topic1"].source == "topic1"
116+
assert result["topic2"].source == "topic2"
117+
118+
tx.close()
119+
120+
121+
@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available")
122+
def test_zeromq_tcp_unsubscribe():
123+
"""Test unsubscribe functionality with TCP transport."""
124+
tx = ZeroMQTransport("tcp://localhost:25558")
125+
topic = Topic("/test/unsub", Message)
126+
127+
result = dict(count=0, event=Event())
128+
129+
def callback(msg):
130+
result["count"] += 1
131+
result["event"].set()
132+
133+
sub = Subscriber(topic, callback, transport=tx)
134+
pub = Publisher(topic, transport=tx)
135+
136+
# Subscribe and receive first message
137+
sub.subscribe()
138+
139+
# Small delay to ensure subscriber is ready
140+
import time
141+
time.sleep(0.1)
142+
143+
pub.publish(Message(seq=1))
144+
145+
received = result["event"].wait(timeout=3)
146+
assert received, "First message not received"
147+
assert result["count"] == 1
148+
149+
# Unsubscribe
150+
result["event"].clear()
151+
sub.unsubscribe()
152+
153+
# Publish second message - should not be received
154+
pub.publish(Message(seq=2))
155+
156+
received = result["event"].wait(timeout=1)
157+
assert received is False, "Second message received but it should have been unsubscribed"
158+
assert result["count"] == 1, "Message count should still be 1"
159+
160+
tx.close()

0 commit comments

Comments
 (0)