Skip to content

Commit 5ddd599

Browse files
committed
address comments
1 parent 7f02246 commit 5ddd599

File tree

5 files changed

+59
-77
lines changed

5 files changed

+59
-77
lines changed

discovery/main.go

Lines changed: 7 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package discovery
22

33
import (
4-
"fmt"
5-
6-
"github.com/miekg/dns"
4+
"net"
75
)
86

97
type Endpoint struct {
@@ -13,50 +11,18 @@ type Endpoint struct {
1311

1412
func DiscoverEndpoints(serviceDNS string) ([]Endpoint, error) {
1513
endpoints := []Endpoint{}
16-
qType := dns.StringToType["SRV"]
17-
serviceDNS = dns.Fqdn(serviceDNS)
18-
19-
client := &dns.Client{}
20-
msg := &dns.Msg{}
21-
22-
msg.SetQuestion(serviceDNS, qType)
23-
24-
dnsserver, err := getDNSServer()
25-
if err != nil {
26-
return endpoints, err
27-
}
14+
_, remotes, err := net.LookupSRV("", "", serviceDNS)
2815

29-
response, _, err := client.Exchange(msg, dnsserver)
3016
if err != nil {
3117
return endpoints, err
3218
}
3319

34-
if msg.Id != response.Id {
35-
return nil, fmt.Errorf("DNS ID mismatch, request: %d, response: %d", msg.Id, response.Id)
36-
}
37-
38-
for _, v := range response.Answer {
39-
if srv, ok := v.(*dns.SRV); ok {
40-
endpoints = append(endpoints, Endpoint{
41-
Name: srv.Target,
42-
Port: srv.Port,
43-
})
44-
}
20+
for _, n := range remotes {
21+
endpoints = append(endpoints, Endpoint{
22+
Name: n.Target,
23+
Port: n.Port,
24+
})
4525
}
4626

4727
return endpoints, nil
4828
}
49-
50-
func getDNSServer() (string, error) {
51-
servers, err := parseResolvConf()
52-
return servers[0], err
53-
}
54-
55-
func parseResolvConf() ([]string, error) {
56-
conf, err := dns.ClientConfigFromFile("/etc/resolv.conf")
57-
if err != nil {
58-
return []string{}, err
59-
}
60-
61-
return conf.Servers, nil
62-
}

glide.lock

Lines changed: 7 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

glide.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import:
77
- package: github.com/kelseyhightower/envconfig
88
version: v1.3.0
99
- package: github.com/nats-io/nats
10-
version: v1.2.2
10+
version: v1.3.0
1111
- package: github.com/pkg/errors
1212
version: v0.8.0
1313
- package: github.com/rybit/nats_logrus_hook
@@ -23,7 +23,6 @@ import:
2323
- package: gopkg.in/mgo.v2
2424
- package: github.com/BurntSushi/toml
2525
version: v0.3.0
26-
- package: github.com/miekg/dns
2726
testImport:
2827
- package: github.com/nats-io/gnatsd
2928
version: v0.9.6

messaging/nats.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ import (
1414
)
1515

1616
type NatsConfig struct {
17-
TLS *tls.Config `mapstructure:"tls_conf"`
18-
ServiceDNSName string `mapstructure:"service_dnsname"`
19-
Servers []string `mapstructure:"servers"`
20-
LogsSubject string `mapstructure:"log_subject"`
17+
TLS *tls.Config `mapstructure:"tls_conf"`
18+
DiscoveryName string `mapstructure:"discovery_name"`
19+
Servers []string `mapstructure:"servers"`
20+
LogsSubject string `mapstructure:"log_subject"`
2121
}
2222

2323
type MetricsConfig struct {
@@ -66,21 +66,14 @@ func ConfigureNatsConnection(config *NatsConfig, log *logrus.Entry) (*nats.Conn,
6666

6767
// ConnectToNats will do a TLS connection to the nats servers specified
6868
func ConnectToNats(config *NatsConfig, errHandler nats.ErrHandler) (*nats.Conn, error) {
69+
var err error
6970
serversString := config.ServerString()
7071

71-
if config.ServiceDNSName != "" {
72-
natsUrls := []string{}
73-
74-
endpoints, err := discovery.DiscoverEndpoints(config.ServiceDNSName)
72+
if config.DiscoveryName != "" {
73+
serversString, err = buildNatsServersString(config.DiscoveryName)
7574
if err != nil {
7675
return nil, err
7776
}
78-
79-
for _, endpoint := range endpoints {
80-
natsUrls = append(natsUrls, fmt.Sprintf("nats://%s:%d", endpoint.Name, endpoint.Port))
81-
}
82-
83-
serversString = strings.Join(natsUrls, ",")
8477
}
8578

8679
options := []nats.Option{}
@@ -111,3 +104,18 @@ func ErrorHandler(log *logrus.Entry) nats.ErrHandler {
111104
}).Error("Error while consuming from " + sub.Subject)
112105
}
113106
}
107+
108+
func buildNatsServersString(serviceName string) (string, error) {
109+
natsUrls := []string{}
110+
111+
endpoints, err := discovery.DiscoverEndpoints(serviceName)
112+
if err != nil {
113+
return "", err
114+
}
115+
116+
for _, endpoint := range endpoints {
117+
natsUrls = append(natsUrls, fmt.Sprintf("nats://%s:%d", endpoint.Name, endpoint.Port))
118+
}
119+
120+
return strings.Join(natsUrls, ","), nil
121+
}

messaging/rabbit.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ func (c *Consumer) Clone(queueName string, delivery *DeliveryDefinition) (*Consu
4141
}
4242

4343
type RabbitConfig struct {
44-
Servers []string `mapstructure:"servers"`
45-
ServiceDNSName string `mapstructure:"service_dnsname"`
46-
TLS *tls.Config `mapstructure:"tls_conf"`
44+
Servers []string `mapstructure:"servers"`
45+
DiscoveryName string `mapstructure:"discovery_name"`
46+
TLS *tls.Config `mapstructure:"tls_conf"`
4747

4848
ExchangeDefinition ExchangeDefinition `envconfig:"exchange" mapstructure:"exchange"`
4949
QueueDefinition QueueDefinition `envconfig:"queue" mapstructure:"queue"`
@@ -206,21 +206,17 @@ func ValidateRabbitConfigStruct(servers []string, exchange ExchangeDefinition, q
206206

207207
// ConnectToRabbit will open a TLS connection to rabbit mq
208208
func ConnectToRabbit(config *RabbitConfig, log *logrus.Entry) (*Consumer, error) {
209-
if err := ValidateRabbitConfig(config); err != nil {
209+
var err error
210+
if err = ValidateRabbitConfig(config); err != nil {
210211
return nil, err
211212
}
212213

213214
servers := config.Servers
214-
if config.ServiceDNSName != "" {
215-
servers = []string{}
216-
endpoints, err := discovery.DiscoverEndpoints(config.ServiceDNSName)
215+
if config.DiscoveryName != "" {
216+
servers, err = discoverRabbitServers(config.DiscoveryName)
217217
if err != nil {
218218
return nil, err
219219
}
220-
221-
for _, endpoint := range endpoints {
222-
servers = append(servers, fmt.Sprintf("%s:%d", endpoint.Name, endpoint.Port))
223-
}
224220
}
225221

226222
conn, err := DialToRabbit(servers, config.TLS, log)
@@ -409,6 +405,21 @@ func Consume(channel *amqp.Channel, deliveryDef *DeliveryDefinition) (<-chan amq
409405
)
410406
}
411407

408+
func discoverRabbitServers(serviceName string) ([]string, error) {
409+
rabbitUrls := []string{}
410+
411+
endpoints, err := discovery.DiscoverEndpoints(serviceName)
412+
if err != nil {
413+
return rabbitUrls, err
414+
}
415+
416+
for _, endpoint := range endpoints {
417+
rabbitUrls = append(rabbitUrls, fmt.Sprintf("%s:%d", endpoint.Name, endpoint.Port))
418+
}
419+
420+
return rabbitUrls, nil
421+
}
422+
412423
// ----------------------------------------------------------------------------
413424
// utils
414425
// ----------------------------------------------------------------------------

0 commit comments

Comments
 (0)