Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions protocol/amqp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ go 1.18
replace github.com/cloudevents/sdk-go/v2 => ../../../v2

require (
github.com/Azure/go-amqp v0.17.0
github.com/Azure/go-amqp v1.0.5
github.com/cloudevents/sdk-go/v2 v2.5.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.9.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
Expand Down
18 changes: 6 additions & 12 deletions protocol/amqp/v2/go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU=
github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand All @@ -19,18 +18,13 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion protocol/amqp/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const prefix = "cloudEvents:" // Name prefix for AMQP properties that hold CE at

var (
// Use the package path as AMQP error condition name
condition = amqp.ErrorCondition(reflect.TypeOf(Message{}).PkgPath())
condition = amqp.ErrCond(reflect.TypeOf(Message{}).PkgPath())
specs = spec.WithPrefix(prefix)
)

Expand Down
35 changes: 25 additions & 10 deletions protocol/amqp/v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,53 @@ import (
type Option func(*Protocol) error

// WithConnOpt sets a connection option for amqp
func WithConnOpt(opt amqp.ConnOption) Option {
func WithConnOpts(opt *amqp.ConnOptions) Option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're using pointer semantics in those options, be careful with races - users might not be aware that you're passing the pointer around (instead of copy) and this could lead to surprises for users. I suggest using value semantic to avoid races and these surprises.

return func(t *Protocol) error {
t.connOpts = append(t.connOpts, opt)
t.connOpts = opt
return nil
}
}

// WithConnSASLPlain sets SASLPlain connection option for amqp
func WithConnSASLPlain(username, password string) Option {
return WithConnOpt(amqp.ConnSASLPlain(username, password))
func WithConnSASLPlain(opt *amqp.ConnOptions, username, password string) Option {
opt.SASLType = amqp.SASLTypePlain(username, password)
return WithConnOpts(opt)
}

// WithSessionOpt sets a session option for amqp
func WithSessionOpt(opt amqp.SessionOption) Option {
func WithSessionOpts(opt *amqp.SessionOptions) Option {
return func(t *Protocol) error {
t.sessionOpts = append(t.sessionOpts, opt)
t.sessionOpts = opt
return nil
}
}

// WithSenderLinkOption sets a link option for amqp
func WithSenderLinkOption(opt amqp.LinkOption) Option {
func WithSenderOpts(opt *amqp.SenderOptions) Option {
return func(t *Protocol) error {
t.senderLinkOpts = append(t.senderLinkOpts, opt)
t.senderOpts = opt
return nil
}
}

// WithReceiverLinkOption sets a link option for amqp
func WithReceiverLinkOption(opt amqp.LinkOption) Option {
func WithReceiverOpts(opt *amqp.ReceiverOptions) Option {
return func(t *Protocol) error {
t.receiverLinkOpts = append(t.receiverLinkOpts, opt)
t.receiverOpts = opt
return nil
}
}

func WithReceiveOpts(opt amqp.ReceiveOptions) Option {
return func(t *Protocol) error {
t.receiveOpts = opt
return nil
}
}

func WithSendOpts(opt *amqp.SendOptions) Option {
return func(t *Protocol) error {
t.sendOpts = opt
return nil
}
}
Expand Down
114 changes: 65 additions & 49 deletions protocol/amqp/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ import (
)

type Protocol struct {
connOpts []amqp.ConnOption
sessionOpts []amqp.SessionOption
senderLinkOpts []amqp.LinkOption
receiverLinkOpts []amqp.LinkOption

// AMQP
Client *amqp.Client
Client *amqp.Conn
Session *amqp.Session
ownedClient bool
Node string

connOpts *amqp.ConnOptions
sessionOpts *amqp.SessionOptions
senderOpts *amqp.SenderOptions
receiverOpts *amqp.ReceiverOptions
sendOpts *amqp.SendOptions
receiveOpts amqp.ReceiveOptions

// Sender
Sender *sender
SenderContextDecorators []func(context.Context) context.Context
Expand All @@ -35,54 +37,61 @@ type Protocol struct {
}

// NewProtocolFromClient creates a new amqp transport.
func NewProtocolFromClient(client *amqp.Client, session *amqp.Session, queue string, opts ...Option) (*Protocol, error) {
func NewProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
queue string,
opts ...Option,
) (*Protocol, error) {
t := &Protocol{
Node: queue,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
Node: queue,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}

t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(queue))

// Create a sender
amqpSender, err := session.NewSender(t.senderLinkOpts...)
amqpSender, err := session.NewSender(ctx, queue, t.senderOpts)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender).(*sender)
t.Sender = NewSender(amqpSender, t.sendOpts).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(t.Node))
amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...)
amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, t.receiverOpts)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver)
return t, nil
}

// NewProtocol creates a new amqp transport.
func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewProtocol(
ctx context.Context,
server, queue string,
connOptions amqp.ConnOptions,
sessionOptions amqp.SessionOptions,
opts ...Option,
) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewProtocolFromClient(client, session, queue, opts...)
p, err := NewProtocolFromClient(ctx, client, session, queue, opts...)
if err != nil {
return nil, err
}
Expand All @@ -92,69 +101,76 @@ func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOpti
}

// NewSenderProtocolFromClient creates a new amqp sender transport.
func NewSenderProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) {
func NewSenderProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
address string,
opts ...Option,
) (*Protocol, error) {
t := &Protocol{
Node: address,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
Node: address,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}
t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(address))

// Create a sender
amqpSender, err := session.NewSender(t.senderLinkOpts...)
amqpSender, err := session.NewSender(ctx, address, t.senderOpts)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender).(*sender)
t.Sender = NewSender(amqpSender, t.sendOpts).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

return t, nil
}

// NewReceiverProtocolFromClient creates a new receiver amqp transport.
func NewReceiverProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) {
func NewReceiverProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
address string,
opts ...Option,
) (*Protocol, error) {
t := &Protocol{
Node: address,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
Node: address,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}

t.Node = address
t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(address))
amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...)
amqpReceiver, err := t.Session.NewReceiver(ctx, address, t.receiverOpts)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver)
return t, nil
}

// NewSenderProtocol creates a new sender amqp transport.
func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewSenderProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewSenderProtocolFromClient(client, session, address, opts...)
p, err := NewSenderProtocolFromClient(ctx, client, session, address, opts...)
if err != nil {
return nil, err
}
Expand All @@ -164,20 +180,20 @@ func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, ses
}

// NewReceiverProtocol creates a new receiver amqp transport.
func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewReceiverProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewReceiverProtocolFromClient(client, session, address, opts...)
p, err := NewReceiverProtocolFromClient(ctx, client, session, address, opts...)

if err != nil {
return nil, err
Expand Down
11 changes: 7 additions & 4 deletions protocol/amqp/v2/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
const serverDown = "session ended by server"

// receiver wraps an amqp.Receiver as a binding.Receiver
type receiver struct{ amqp *amqp.Receiver }
type receiver struct {
amqp *amqp.Receiver
options amqp.ReceiveOptions
}

func (r *receiver) Receive(ctx context.Context) (binding.Message, error) {
m, err := r.amqp.Receive(ctx)
m, err := r.amqp.Receive(ctx, &r.options)
if err != nil {
if err == ctx.Err() {
return nil, io.EOF
Expand All @@ -38,6 +41,6 @@ func (r *receiver) Receive(ctx context.Context) (binding.Message, error) {
}

// NewReceiver create a new Receiver which wraps an amqp.Receiver in a binding.Receiver
func NewReceiver(amqp *amqp.Receiver) protocol.Receiver {
return &receiver{amqp: amqp}
func NewReceiver(amqp *amqp.Receiver, options amqp.ReceiveOptions) protocol.Receiver {
return &receiver{amqp: amqp, options: options}
}
Loading