diff --git a/bindings/rabbitmq/metadata.yaml b/bindings/rabbitmq/metadata.yaml index ae1e2afac7..8fb8c1e2eb 100644 --- a/bindings/rabbitmq/metadata.yaml +++ b/bindings/rabbitmq/metadata.yaml @@ -29,6 +29,12 @@ metadata: description: "RabbitMQ queue name." type: string example: '"myqueue"' + - name: queueType + required: false + description: "RabbitMQ queue type." + type: string + example: '"classic", "quorum"' + default: '"classic"' - name: durable type: bool description: | diff --git a/bindings/rabbitmq/rabbitmq.go b/bindings/rabbitmq/rabbitmq.go index 28ec7cbd0d..053f06c1f3 100644 --- a/bindings/rabbitmq/rabbitmq.go +++ b/bindings/rabbitmq/rabbitmq.go @@ -92,6 +92,7 @@ type rabbitMQMetadata struct { ClientCert string `mapstructure:"clientCert"` ClientKey string `mapstructure:"clientKey"` ExternalSasl bool `mapstructure:"externalSasl"` + QueueType string `mapstructure:"queueType"` } // NewRabbitMQ returns a new rabbitmq instance. diff --git a/bindings/rabbitmq/rabbitmq_integration_test.go b/bindings/rabbitmq/rabbitmq_integration_test.go index 17c288c587..f4b288a641 100644 --- a/bindings/rabbitmq/rabbitmq_integration_test.go +++ b/bindings/rabbitmq/rabbitmq_integration_test.go @@ -447,3 +447,45 @@ func TestPublishWithHeaders(t *testing.T) { // assert.Contains(t, msg.Header, "custom_header1") // assert.Contains(t, msg.Header, "custom_header2") } +func TestQuorumQueue(t *testing.T) { + rabbitmqHost := getTestRabbitMQHost() + assert.NotEmpty(t, rabbitmqHost, fmt.Sprintf("RabbitMQ host configuration must be set in environment variable '%s' (example 'amqp://guest:guest@localhost:5672/')", testRabbitMQHostEnvKey)) + + queueName := uuid.New().String() + durable := true + exclusive := false + + metadata := bindings.Metadata{ + Base: contribMetadata.Base{ + Name: "testQueue", + Properties: map[string]string{ + "queueName": queueName, + "host": rabbitmqHost, + "deleteWhenUnused": strconv.FormatBool(exclusive), + "durable": strconv.FormatBool(durable), + "queueType": "quorum", + }, + }, + } + + logger := logger.NewLogger("test") + + r := NewRabbitMQ(logger).(*RabbitMQ) + err := r.Init(context.Background(), metadata) + require.NoError(t, err) + + // Assert that the queue is created with quorum type + conn, err := amqp.Dial(rabbitmqHost) + require.NoError(t, err) + defer conn.Close() + + ch, err := conn.Channel() + require.NoError(t, err) + defer ch.Close() + + queue, err := ch.QueueDeclarePassive(queueName, durable, false, exclusive, false, amqp.Table{}) + require.NoError(t, err) + assert.Equal(t, "quorum", queue.Arguments["x-queue-type"]) + + require.NoError(t, r.Close()) +} diff --git a/bindings/rabbitmq/rabbitmq_test.go b/bindings/rabbitmq/rabbitmq_test.go index 21353e6603..8fbe177382 100644 --- a/bindings/rabbitmq/rabbitmq_test.go +++ b/bindings/rabbitmq/rabbitmq_test.go @@ -57,6 +57,7 @@ func TestParseMetadata(t *testing.T) { expectedClientKey string expectedCACert string expectedSaslExternal bool + expectedQueueType string }{ { name: "Delete / Durable", @@ -101,6 +102,13 @@ func TestParseMetadata(t *testing.T) { expectedDurable: false, expectedExclusive: true, }, + { + name: "Quorum Queue", + properties: map[string]string{"queueName": queueName, "host": host, "deleteWhenUnused": "false", "durable": "false", "queueType": "quorum"}, + expectedDeleteWhenUnused: false, + expectedDurable: false, + expectedQueueType: "quorum", + }, { name: "With maxPriority", properties: map[string]string{"queueName": queueName, "host": host, "deleteWhenUnused": "false", "durable": "false", "maxPriority": "1"}, @@ -158,6 +166,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, tt.expectedClientKey, r.metadata.ClientKey) assert.Equal(t, tt.expectedCACert, r.metadata.CaCert) assert.Equal(t, tt.expectedSaslExternal, r.metadata.ExternalSasl) + assert.Equal(t, tt.expectedQueueType, r.metadata.QueueType) if tt.expectedReconnectWaitCheck != nil { assert.True(t, tt.expectedReconnectWaitCheck(r.metadata.ReconnectWait)) } diff --git a/pubsub/rabbitmq/metadata.go b/pubsub/rabbitmq/metadata.go index 6b6e6fc7b1..c8c48657f2 100644 --- a/pubsub/rabbitmq/metadata.go +++ b/pubsub/rabbitmq/metadata.go @@ -52,6 +52,7 @@ type rabbitmqMetadata struct { SaslExternal bool `mapstructure:"saslExternal"` Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"` DefaultQueueTTL *time.Duration `mapstructure:"ttlInSeconds"` + QueueType string `mapstructure:"queueType"` } const ( @@ -82,6 +83,7 @@ const ( metadataClientNameKey = "clientName" metadataHeartBeatKey = "heartBeat" metadataQueueNameKey = "queueName" + metadataQueueType = "queueType" defaultReconnectWaitSeconds = 3 @@ -102,6 +104,7 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*rabbitm PublisherConfirm: false, SaslExternal: false, HeartBeat: defaultHeartbeat, + QueueType: amqp.QueueTypeClassic, } // upgrade metadata @@ -158,6 +161,10 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*rabbitm return &result, fmt.Errorf("%s can only be set to true, when all these properties are set: %s, %s, %s", metadataSaslExternal, pubsub.CACert, pubsub.ClientCert, pubsub.ClientKey) } + if result.QueueType != amqp.QueueTypeClassic && result.QueueType != amqp.QueueTypeQuorum { + return &result, fmt.Errorf("%s invalid RabbitMQ queue type %s", errorMessagePrefix, result.QueueType) + } + result.Concurrency, err = pubsub.Concurrency(pubSubMetadata.Properties) return &result, err }