-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscribe.go
85 lines (73 loc) · 2.51 KB
/
subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package goqtt
import (
"fmt"
"io"
"github.com/rs/zerolog/log"
"github.com/andschneider/goqtt/packets"
)
// Subscribe attempts to create a subscription for a client to it's configured topic.
// It sends a SUBSCRIBE packet and reads the SUBACK packet.
func (c *Client) Subscribe() error {
// create packet
var p packets.SubscribePacket
p.CreateSubscribePacket(c.Config.topic)
err := c.sendPacket(&p)
if err != nil {
return fmt.Errorf("could not write Subscribe packet: %v", err)
}
// read response and verify it's a SUBACK packet
r, err := c.readResponse()
if err != nil {
return fmt.Errorf("could not read response for %s: %v", p.Name(), err)
}
if _, ok := r.(*packets.SubackPacket); !ok {
typeErrorResponseLogger(p.Name(), r.Name(), r)
return fmt.Errorf("did not receive a SUBACK packet, got %s instead", r.Name())
}
return nil
}
// Unsubscribe sends an UNSUBSCRIBE packet for a given topic and reads the UNSUBACK packet.
func (c *Client) Unsubscribe() error {
// create packet
var p packets.UnsubscribePacket
p.CreateUnsubscribePacket(c.Config.topic)
err := c.sendPacket(&p)
if err != nil {
return fmt.Errorf("could not write Unsubscribe packet: %v", err)
}
// read response and verify it's a UNSUBACK packet
r, err := c.readResponse()
if err != nil {
return fmt.Errorf("could not read response for %s: %v", p.Name(), err)
}
if _, ok := r.(*packets.UnsubackPacket); !ok {
typeErrorResponseLogger(p.Name(), r.Name(), r)
return fmt.Errorf("did not receive an UNSUBACK packet, got %s instead", r.Name())
}
return err
}
// TODO this is basically a wrapper around the ReadPacket function, and might get replaced/modified later.
// ReadLoop should be used after a successful subscription to a topic and reads any incoming messages.
// It returns a PublishPacket if one has been received for further processing.
func (c *Client) ReadLoop() (*packets.PublishPacket, error) {
p, err := packets.ReadPacket(c.conn)
if err != nil {
if err == io.EOF {
log.Warn().Msg("Looks like the server closed the connection...")
return nil, err
}
log.Error().Err(err).Msg("subscribe loop error")
}
// process packets based on type
switch packet := p.(type) {
case *packets.PublishPacket:
return packet, nil
case *packets.PingRespPacket:
// expected from the keepAlive, all good
log.Debug().Str("source", "goqtt").Str("packet", packet.String()).Msg("pingresp received")
return nil, nil
default:
log.Warn().Str("source", "goqtt").Str("packet", packet.String()).Msg("packet type unexpected")
}
return nil, nil
}