-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathbaseMq.go
172 lines (153 loc) · 4.78 KB
/
baseMq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package mq
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/astaxie/beego"
"github.com/streadway/amqp"
)
type MqConnection struct {
Lock sync.RWMutex
Connection *amqp.Connection
MqUri string
}
type ChannelContext struct {
Exchange string
ExchangeType string
RoutingKey string
Reliable bool
Durable bool
ChannelId string
Channel *amqp.Channel
}
type BaseMq struct {
MqConnection *MqConnection
//channel cache
ChannelContexts map[string]*ChannelContext
}
func (bmq *BaseMq) Init() {
bmq.ChannelContexts = make(map[string]*ChannelContext)
}
// One would typically keep a channel of publishings, a sequence number, and a
// set of unacknowledged sequence numbers and loop until the publishing channel
// is closed.
func (bmq *BaseMq) confirmOne(confirms <-chan amqp.Confirmation) {
beego.Info("waiting for confirmation of one publishing")
if confirmed := <-confirms; confirmed.Ack {
beego.Info("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
beego.Error("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
}
/*
func (bmq *BaseMq) getMqUri() string {
return "amqp://" + bmq.MqConnection.User + ":" + bmq.MqConnection.PassWord + "@" + bmq.MqConnection.Host + ":" + bmq.MqConnection.Port + "/"
}
*/
/*
get md5 from channel context
*/
func (bmq *BaseMq) generateChannelId(channelContext *ChannelContext) string {
stringTag := channelContext.Exchange + ":" + channelContext.ExchangeType + ":" + channelContext.RoutingKey + ":" +
strconv.FormatBool(channelContext.Durable) + ":" + strconv.FormatBool(channelContext.Reliable)
hasher := md5.New()
hasher.Write([]byte(stringTag))
return hex.EncodeToString(hasher.Sum(nil))
}
/*
1. use old connection to generate channel
2. update connection then channel
*/
func (bmq *BaseMq) refreshConnectionAndChannel(channelContext *ChannelContext) error {
bmq.MqConnection.Lock.Lock()
defer bmq.MqConnection.Lock.Unlock()
var err error
if bmq.MqConnection.Connection != nil {
channelContext.Channel, err = bmq.MqConnection.Connection.Channel()
} else {
fmt.Println("connection not init,dial first time..")
err = errors.New("connection nil")
}
// reconnect connection
if err != nil {
for {
bmq.MqConnection.Connection, err = amqp.Dial(bmq.MqConnection.MqUri)
if err != nil {
fmt.Println("connect mq get connection error,retry..." + bmq.MqConnection.MqUri)
time.Sleep(10 * time.Second)
} else {
channelContext.Channel, _ = bmq.MqConnection.Connection.Channel()
break
}
}
}
if err = channelContext.Channel.ExchangeDeclare(
channelContext.Exchange, // name
channelContext.ExchangeType, // type
channelContext.Durable, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
); err != nil {
fmt.Println("channel exchange deflare failed refreshConnectionAndChannel again", err)
return err
}
// Reliable publisher confirms require confirm.select support from the
// connection.
/*if channelContext.Reliable {
fmt.Println("enabling publishing confirms.")
if err := channelContext.Channel.Confirm(false); err != nil {
fmt.Println("Channel could not be put into confirm mode: %s", err)
return err
}
fmt.Println("confirm begin")
confirms := channelContext.Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
fmt.Println("confirm end")
defer bmq.confirmOne(confirms)
}*/
//add channel to channel cache
bmq.ChannelContexts[channelContext.ChannelId] = channelContext
return nil
}
/*
publish message
*/
func (bmq *BaseMq) Publish(channelContext *ChannelContext, body string) error {
channelContext.ChannelId = bmq.generateChannelId(channelContext)
if bmq.ChannelContexts[channelContext.ChannelId] == nil {
bmq.refreshConnectionAndChannel(channelContext)
} else {
channelContext = bmq.ChannelContexts[channelContext.ChannelId]
}
fmt.Println("declared Exchange, publishing %dB body (%q)", len(body), body)
beego.Info("declared Exchange, publishing %dB body (%q)", len(body), body)
for {
if err := channelContext.Channel.Publish(
channelContext.Exchange, // publish to an exchange
channelContext.RoutingKey, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "application/json",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
fmt.Println("send message failed refresh connection")
time.Sleep(10 * time.Second)
bmq.refreshConnectionAndChannel(channelContext)
} else {
fmt.Println("send messsage succes")
}
}
return nil
}