forked from TheThingsArchive/python-app-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_ttnmqtt.py
119 lines (92 loc) · 3.4 KB
/
test_ttnmqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# coding: Latin-1
# Copyright © 2018 The Things Network
# Use of this source code is governed by the
# MIT license that can be found in the LICENSE file.
import unittest
import time
import json
from ttn import MQTTClient as mqtt
from ttn.utils import stubs
MQTT_ADDR = "localhost:1883"
class TestMQTTClient(unittest.TestCase):
def test_connect_disconnect(self):
def connectcallback(res, client):
print(res)
assert res
def closecallback(res, client):
print(res)
assert res
ttn_client = mqtt(stubs.apptest["appId"],
stubs.apptest["accessKey"],
mqtt_address=MQTT_ADDR)
ttn_client.set_connect_callback(connectcallback)
ttn_client.set_close_callback(closecallback)
ttn_client.connect()
time.sleep(1)
ttn_client.close()
def test_uplink(self):
def uplinkcallback(message, client):
print(message)
assert message.payload_raw == "AQ=="
ttn_client = mqtt(stubs.apptest["appId"],
stubs.apptest["accessKey"],
mqtt_address=MQTT_ADDR)
ttn_client.set_uplink_callback(uplinkcallback)
ttn_client.connect()
time.sleep(1)
ttn_client._MQTTClient__client.publish(
"{}/devices/guest/up".format(stubs.apptest["appId"]),
json.dumps(stubs.uplink))
time.sleep(1)
ttn_client.close()
def test_connect_error(self):
def connectcallback(res, client):
print(res)
assert res is False
ttn_client = mqtt(stubs.apptest["appId"],
stubs.apptest["accessKey"],
mqtt_address="badAddress:5555")
ttn_client.set_connect_callback(connectcallback)
try:
ttn_client.connect()
except:
ttn_client.close()
def test_downlink_payloadraw(self):
def downlinkcallback(mid, client):
print(mid)
assert mid == 1
ttn_client = mqtt(stubs.apptest["appId"],
stubs.apptest["accessKey"],
mqtt_address=MQTT_ADDR)
ttn_client.set_downlink_callback(downlinkcallback)
ttn_client.connect()
time.sleep(1)
ttn_client.send("guest", "AQ==")
time.sleep(1)
ttn_client.close()
def test_downlink_payloadfields(self):
def downlinkcallback(mid, client):
print(mid)
assert mid == 1
ttn_client = mqtt(stubs.apptest["appId"],
stubs.apptest["accessKey"],
mqtt_address=MQTT_ADDR)
ttn_client.set_downlink_callback(downlinkcallback)
ttn_client.connect()
time.sleep(1)
ttn_client.send("guest", {"field1": 1, "field2": 2})
time.sleep(1)
ttn_client.close()
def test_providing_all_downlink_options(self):
def downlinkcallback(mid, client):
print(mid)
assert mid == 1
ttn_client = mqtt(stubs.apptest["appId"],
stubs.apptest["accessKey"],
mqtt_address=MQTT_ADDR)
ttn_client.set_downlink_callback(downlinkcallback)
ttn_client.connect()
time.sleep(1)
ttn_client.send("guest", "AQ==", 2, True, "first")
time.sleep(1)
ttn_client.close()