Skip to content

Commit 4636294

Browse files
authored
add docs for mqtt-sink (#1032)
1 parent 3d9938a commit 4636294

File tree

2 files changed

+132
-0
lines changed

2 files changed

+132
-0
lines changed

docs/build/build.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
"quixstreams.sinks.community.iceberg",
138138
"quixstreams.sinks.community.kinesis",
139139
"quixstreams.sinks.community.mongodb",
140+
"quixstreams.sinks.community.mqtt",
140141
"quixstreams.sinks.community.neo4j",
141142
"quixstreams.sinks.community.postgresql",
142143
"quixstreams.sinks.community.pubsub",

docs/connectors/sinks/mqtt-sink.md

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# MQTT Sink
2+
3+
!!! info
4+
5+
This is a **Community** connector. Test it before using in production.
6+
7+
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.
8+
9+
This sink writes data to an MQTT broker. The sink publishes messages to MQTT topics based on the Kafka message key and preserves message ordering within each topic.
10+
11+
## How To Install
12+
13+
To use MQTTSink, you need to install the required dependencies:
14+
15+
```bash
16+
pip install quixstreams[mqtt]
17+
```
18+
19+
## How It Works
20+
21+
`MQTTSink` is a streaming sink that publishes messages to an MQTT broker.
22+
23+
Messages can optionally be retained and include MQTT properties.
24+
25+
For each message:
26+
27+
- The value is serialized (defaults to JSON)
28+
- The key is converted to a string and used as the topic suffix
29+
- It is published to `{topic_root}/{key}`
30+
31+
## How To Use
32+
33+
Create an instance of `MQTTSink` and pass it to the `StreamingDataFrame.sink()` method:
34+
35+
```python
36+
from quixstreams import Application
37+
from quixstreams.sinks.community.mqtt import MQTTSink
38+
39+
app = Application(broker_address="localhost:9092")
40+
topic = app.topic("topic-name")
41+
42+
# Configure the sink
43+
mqtt_sink = MQTTSink(
44+
client_id="my-mqtt-publisher",
45+
server="mqtt.broker.com",
46+
port=1883,
47+
topic_root="sensors",
48+
username="your_username",
49+
password="your_password",
50+
)
51+
52+
sdf = app.dataframe(topic=topic)
53+
sdf.sink(mqtt_sink)
54+
55+
if __name__ == "__main__":
56+
app.run()
57+
```
58+
59+
## Configuration Options
60+
61+
### Required:
62+
63+
- `client_id`: MQTT client identifier
64+
- `server`: MQTT broker server address
65+
- `port`: MQTT broker server port
66+
- `topic_root`: Root topic to publish messages to
67+
68+
### Optional:
69+
70+
- `username`: Username for MQTT broker authentication.
71+
- `password`: Password for MQTT broker authentication.
72+
- `version`: MQTT protocol version ("3.1", "3.1.1", or "5").
73+
**Default**: `"3.1.1"`
74+
- `tls_enabled`: Whether to use TLS encryption.
75+
**Default**: `True`
76+
- `key_serializer`: Function to serialize message keys to string.
77+
**Default**: `bytes.decode`
78+
- `value_serializer`: Function to serialize message values.
79+
**Default**: `json.dumps`
80+
- `qos`: Quality of Service level (0 or 1; 2 not yet supported).
81+
**Default**: `1`
82+
- `mqtt_flush_timeout_seconds`: How long to wait for publish acknowledgment before failing.
83+
**Default**: `10`
84+
- `retain`: Whether to retain messages for new subscribers. Can be a boolean or callable.
85+
**Default**: `False`
86+
- `properties`: MQTT properties (MQTT v5 only). Can be a Properties instance or callable.
87+
- `on_client_connect_success`: Optional callback for successful client authentication.
88+
- `on_client_connect_failure`: Optional callback for failed client authentication.
89+
90+
## Error Handling and Delivery Guarantees
91+
92+
The sink provides delivery guarantees based on the configured QoS level:
93+
94+
- **QoS 0**: At most once delivery; messages are published without MQTT broker acknowledgment
95+
- **QoS 1**: At least once delivery; messages are published with MQTT broker acknowledgment
96+
97+
During checkpointing, the sink waits for all pending publish acknowledgments to complete:
98+
99+
- The wait time is controlled by `mqtt_flush_timeout_seconds` parameter
100+
- If any messages fail to publish within the flush timeout, an error is raised
101+
- When errors occur:
102+
- The application will retry the entire batch from the last successful offset
103+
- Some messages that were successfully published in the failed batch may be published again
104+
- This ensures no messages are lost, but some might be delivered more than once
105+
106+
This behavior makes the sink reliable but downstream systems should be prepared to handle duplicate messages.
107+
108+
## Testing Locally
109+
110+
You can test `MQTTSink` locally using a local MQTT broker like Mosquitto:
111+
112+
1. Run Mosquitto with custom config:
113+
114+
```bash
115+
# Create mosquitto config and run container
116+
docker run --rm -d --name mosquitto -p 1883:1883 -p 9001:9001 \
117+
--entrypoint sh eclipse-mosquitto \
118+
-c 'echo -e "listener 1883\nallow_anonymous true" > /mosquitto/config/mosquitto.conf && exec /usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf'
119+
```
120+
121+
2. Configure `MQTTSink` to connect to it:
122+
123+
```python
124+
mqtt_sink = MQTTSink(
125+
client_id="test-publisher",
126+
server="localhost",
127+
port=1883,
128+
topic_root="test",
129+
tls_enabled=False, # Disable TLS for local testing
130+
)
131+
```

0 commit comments

Comments
 (0)