@@ -43,9 +43,9 @@ type RabbitConfig struct {
43
43
Servers []string `mapstructure:"servers"`
44
44
TLS * tls.Config `mapstructure:"tls_conf"`
45
45
46
- ExchangeDefinition ExchangeDefinition `mapstructure:"exchange"`
47
- QueueDefinition QueueDefinition `mapstructure:"queue"`
48
- DeliveryDefinition * DeliveryDefinition `mapstructure:"delivery"`
46
+ ExchangeDefinition ExchangeDefinition `envconfig:"exchange" mapstructure:"exchange"`
47
+ QueueDefinition QueueDefinition `envconfig:"queue" mapstructure:"queue"`
48
+ DeliveryDefinition * DeliveryDefinition `envconfig:"delivery" mapstructure:"delivery"`
49
49
}
50
50
51
51
// ExchangeDefinition defines all the parameters for an exchange
@@ -222,18 +222,22 @@ func DialToRabbit(servers []string, tls *tls.Config, log *logrus.Entry) (*amqp.C
222
222
Heartbeat : 10 * time .Second ,
223
223
}
224
224
225
- fields := logrus.Fields {}
225
+ fields := logrus.Fields {
226
+ "servers" : strings .Join (servers , "," ),
227
+ }
226
228
if tls != nil {
227
- fields ["cert_file" ] = tls .CertFile
228
- fields ["key_file" ] = tls .KeyFile
229
- fields ["ca_files" ] = tls .CAFiles
230
-
231
229
tlsConfig , err := tls .TLSConfig ()
232
230
if err != nil {
233
231
return nil , err
234
232
}
235
233
236
- dialConfig .TLSClientConfig = tlsConfig
234
+ if tlsConfig != nil {
235
+ fields ["cert_file" ] = tls .CertFile
236
+ fields ["key_file" ] = tls .KeyFile
237
+ fields ["ca_files" ] = tls .CAFiles
238
+ log .WithFields (fields ).Debug ("Forcing TLS connection" )
239
+ dialConfig .TLSClientConfig = tlsConfig
240
+ }
237
241
}
238
242
239
243
log .WithFields (fields ).Info ("Dialing rabbitmq servers" )
@@ -247,9 +251,12 @@ func DialToRabbit(servers []string, tls *tls.Config, log *logrus.Entry) (*amqp.C
247
251
248
252
// CreateChannel initializes a new message channel.
249
253
func CreateChannel (conn * amqp.Connection , exchange ExchangeDefinition , queue QueueDefinition , log * logrus.Entry ) (* amqp.Channel , error ) {
254
+ log .Debugf ("Original exchange definition: %s" , exchange .JSON ())
250
255
ed := NewExchangeDefinition (exchange .Name , exchange .Type )
251
256
ed .merge (& exchange )
252
257
log .Debugf ("Using exchange definition: %s" , ed .JSON ())
258
+
259
+ log .Debugf ("Original queue definition: %s" , queue .JSON ())
253
260
qd := NewQueueDefinition (queue .Name , queue .BindingKey )
254
261
qd .merge (& queue )
255
262
log .Debugf ("Using queue definition %s" , qd .JSON ())
@@ -272,6 +279,7 @@ func CreateConsumer(conn *amqp.Connection, exchange ExchangeDefinition, queue Qu
272
279
func CreateConsumerOnChannel (conn * amqp.Connection , ch * amqp.Channel , queue QueueDefinition , delivery * DeliveryDefinition , log * logrus.Entry ) (* Consumer , error ) {
273
280
dd := NewDeliveryDefinition (queue .Name )
274
281
if delivery != nil {
282
+ log .Debugf ("Original delivery definition: %s" , delivery .JSON ())
275
283
dd .merge (delivery )
276
284
}
277
285
log .Debugf ("Using delivery definition: %s" , dd .JSON ())
0 commit comments