Skip to content

Commit b6c8c97

Browse files
rishi.subburaman@sticsoftsolutions.comsonicboom15
authored andcommitted
Added Quorum Queue Support
Signed-off-by: [email protected] <Rishi Kumar S> Signed-off-by: Rishi Kumar S <[email protected]>
1 parent 1132db5 commit b6c8c97

File tree

4 files changed

+59
-0
lines changed

4 files changed

+59
-0
lines changed

bindings/rabbitmq/rabbitmq.go

+1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type rabbitMQMetadata struct {
9292
ClientCert string `mapstructure:"clientCert"`
9393
ClientKey string `mapstructure:"clientKey"`
9494
ExternalSasl bool `mapstructure:"externalSasl"`
95+
QueueType string `mapstructure:"queueType"`
9596
}
9697

9798
// NewRabbitMQ returns a new rabbitmq instance.

bindings/rabbitmq/rabbitmq_integration_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -447,3 +447,45 @@ func TestPublishWithHeaders(t *testing.T) {
447447
// assert.Contains(t, msg.Header, "custom_header1")
448448
// assert.Contains(t, msg.Header, "custom_header2")
449449
}
450+
func TestQuorumQueue(t *testing.T) {
451+
rabbitmqHost := getTestRabbitMQHost()
452+
assert.NotEmpty(t, rabbitmqHost, fmt.Sprintf("RabbitMQ host configuration must be set in environment variable '%s' (example 'amqp://guest:guest@localhost:5672/')", testRabbitMQHostEnvKey))
453+
454+
queueName := uuid.New().String()
455+
durable := true
456+
exclusive := false
457+
458+
metadata := bindings.Metadata{
459+
Base: contribMetadata.Base{
460+
Name: "testQueue",
461+
Properties: map[string]string{
462+
"queueName": queueName,
463+
"host": rabbitmqHost,
464+
"deleteWhenUnused": strconv.FormatBool(exclusive),
465+
"durable": strconv.FormatBool(durable),
466+
"queueType": "quorum",
467+
},
468+
},
469+
}
470+
471+
logger := logger.NewLogger("test")
472+
473+
r := NewRabbitMQ(logger).(*RabbitMQ)
474+
err := r.Init(context.Background(), metadata)
475+
require.NoError(t, err)
476+
477+
// Assert that the queue is created with quorum type
478+
conn, err := amqp.Dial(rabbitmqHost)
479+
require.NoError(t, err)
480+
defer conn.Close()
481+
482+
ch, err := conn.Channel()
483+
require.NoError(t, err)
484+
defer ch.Close()
485+
486+
queue, err := ch.QueueDeclarePassive(queueName, durable, false, exclusive, false, amqp.Table{})
487+
require.NoError(t, err)
488+
assert.Equal(t, "quorum", queue.Arguments["x-queue-type"])
489+
490+
require.NoError(t, r.Close())
491+
}

bindings/rabbitmq/rabbitmq_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func TestParseMetadata(t *testing.T) {
5757
expectedClientKey string
5858
expectedCACert string
5959
expectedSaslExternal bool
60+
expectedQueueType string
6061
}{
6162
{
6263
name: "Delete / Durable",
@@ -101,6 +102,13 @@ func TestParseMetadata(t *testing.T) {
101102
expectedDurable: false,
102103
expectedExclusive: true,
103104
},
105+
{
106+
name: "Quorum Queue",
107+
properties: map[string]string{"queueName": queueName, "host": host, "deleteWhenUnused": "false", "durable": "false", "queueType": "quorum"},
108+
expectedDeleteWhenUnused: false,
109+
expectedDurable: false,
110+
expectedQueueType: "quorum",
111+
},
104112
{
105113
name: "With maxPriority",
106114
properties: map[string]string{"queueName": queueName, "host": host, "deleteWhenUnused": "false", "durable": "false", "maxPriority": "1"},
@@ -158,6 +166,7 @@ func TestParseMetadata(t *testing.T) {
158166
assert.Equal(t, tt.expectedClientKey, r.metadata.ClientKey)
159167
assert.Equal(t, tt.expectedCACert, r.metadata.CaCert)
160168
assert.Equal(t, tt.expectedSaslExternal, r.metadata.ExternalSasl)
169+
assert.Equal(t, tt.expectedQueueType, r.metadata.QueueType)
161170
if tt.expectedReconnectWaitCheck != nil {
162171
assert.True(t, tt.expectedReconnectWaitCheck(r.metadata.ReconnectWait))
163172
}

pubsub/rabbitmq/metadata.go

+7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type rabbitmqMetadata struct {
5252
SaslExternal bool `mapstructure:"saslExternal"`
5353
Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"`
5454
DefaultQueueTTL *time.Duration `mapstructure:"ttlInSeconds"`
55+
QueueType string `mapstructure:"queueType"`
5556
}
5657

5758
const (
@@ -82,6 +83,7 @@ const (
8283
metadataClientNameKey = "clientName"
8384
metadataHeartBeatKey = "heartBeat"
8485
metadataQueueNameKey = "queueName"
86+
metadataQueueType = "queueType"
8587

8688
defaultReconnectWaitSeconds = 3
8789

@@ -102,6 +104,7 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*rabbitm
102104
PublisherConfirm: false,
103105
SaslExternal: false,
104106
HeartBeat: defaultHeartbeat,
107+
QueueType: amqp.QueueTypeClassic,
105108
}
106109

107110
// upgrade metadata
@@ -158,6 +161,10 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*rabbitm
158161
return &result, fmt.Errorf("%s can only be set to true, when all these properties are set: %s, %s, %s", metadataSaslExternal, pubsub.CACert, pubsub.ClientCert, pubsub.ClientKey)
159162
}
160163

164+
if result.QueueType != amqp.QueueTypeClassic && result.QueueType != amqp.QueueTypeQuorum {
165+
return &result, fmt.Errorf("%s invalid RabbitMQ queue type %s", errorMessagePrefix, result.QueueType)
166+
}
167+
161168
result.Concurrency, err = pubsub.Concurrency(pubSubMetadata.Properties)
162169
return &result, err
163170
}

0 commit comments

Comments
 (0)