diff --git a/bindings/rabbitmq/rabbitmq.go b/bindings/rabbitmq/rabbitmq.go index 28ec7cbd0d..e06089e2bc 100644 --- a/bindings/rabbitmq/rabbitmq.go +++ b/bindings/rabbitmq/rabbitmq.go @@ -31,6 +31,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "github.com/dapr/components-contrib/bindings" + common "github.com/dapr/components-contrib/common/component/rabbitmq" "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" kitmd "github.com/dapr/kit/metadata" @@ -228,11 +229,6 @@ func (r *RabbitMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi pub.Headers[k] = v } - contentType, ok := metadata.TryGetContentType(req.Metadata) - if ok { - pub.ContentType = contentType - } - // The default time to live has been set in the queue // We allow overriding on each call, by setting a value in request metadata ttl, ok, err := metadata.TryGetTTL(req.Metadata) @@ -252,6 +248,8 @@ func (r *RabbitMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi pub.Priority = priority } + common.ApplyMetadataToPublishing(req.Metadata, &pub) + err = ch.PublishWithContext(ctx, "", r.metadata.QueueName, false, false, pub) if err != nil { return nil, fmt.Errorf("failed to publish message: %w", err) diff --git a/bindings/rabbitmq/rabbitmq_integration_test.go b/bindings/rabbitmq/rabbitmq_integration_test.go index dcc380c232..def3d8513e 100644 --- a/bindings/rabbitmq/rabbitmq_integration_test.go +++ b/bindings/rabbitmq/rabbitmq_integration_test.go @@ -447,3 +447,67 @@ func TestPublishWithHeaders(t *testing.T) { // assert.Contains(t, msg.Header, "custom_header1") // assert.Contains(t, msg.Header, "custom_header2") } + +func TestPublishMetadataProperties(t *testing.T) { + rabbitmqHost := getTestRabbitMQHost() + require.NotEmpty(t, rabbitmqHost, fmt.Sprintf("RabbitMQ host configuration must be set in environment variable '%s'", testRabbitMQHostEnvKey)) + + queueName := uuid.New().String() + durable := true + exclusive := false + + metadata := bindings.Metadata{ + Base: contribMetadata.Base{ + Name: "testQueue", + Properties: map[string]string{ + "queueName": queueName, + "host": rabbitmqHost, + "deleteWhenUnused": strconv.FormatBool(exclusive), + "durable": strconv.FormatBool(durable), + }, + }, + } + + logger := logger.NewLogger("test") + r := NewRabbitMQ(logger).(*RabbitMQ) + err := r.Init(t.Context(), metadata) + require.NoError(t, err) + + conn, err := amqp.Dial(rabbitmqHost) + require.NoError(t, err) + defer conn.Close() + + ch, err := conn.Channel() + require.NoError(t, err) + defer ch.Close() + + const messageData = "test message" + const msgID = "msg-123" + const corrID = "corr-456" + const msgType = "testType" + const contentType = "application/json" + + writeRequest := bindings.InvokeRequest{ + Data: []byte(messageData), + Metadata: map[string]string{ + "messageID": msgID, + "correlationID": corrID, + "type": msgType, + "contentType": contentType, + }, + } + _, err = r.Invoke(t.Context(), &writeRequest) + require.NoError(t, err) + + // Retrieve the message. + msg, ok, err := getMessageWithRetries(ch, queueName, 2*time.Second) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, messageData, string(msg.Body)) + assert.Equal(t, msgID, msg.MessageId) + assert.Equal(t, corrID, msg.CorrelationId) + assert.Equal(t, msgType, msg.Type) + assert.Equal(t, contentType, msg.ContentType) + + require.NoError(t, r.Close()) +} diff --git a/common/component/rabbitmq/rabbitmq.go b/common/component/rabbitmq/rabbitmq.go new file mode 100644 index 0000000000..c546a76f30 --- /dev/null +++ b/common/component/rabbitmq/rabbitmq.go @@ -0,0 +1,70 @@ +package rabbitmq + +import ( + "strings" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// tryGetProperty checks for a property value using various key formats: original, camelCase, and case-insensitive +func tryGetProperty(props map[string]string, keys ...string) (string, bool) { + // First try exact match for all provided keys + for _, key := range keys { + if val, ok := props[key]; ok && val != "" { + return val, true + } + } + + // Then try case-insensitive match if no exact matches were found + for k, v := range props { + if v != "" { + lowerK := strings.ToLower(k) + for _, key := range keys { + if strings.ToLower(key) == lowerK { + return v, true + } + } + } + } + + return "", false +} + +func TryGetMessageID(props map[string]string) (string, bool) { + return tryGetProperty(props, "messageId", "messageID", "MessageId", "MessageID") +} + +func TryGetCorrelationID(props map[string]string) (string, bool) { + return tryGetProperty(props, "correlationId", "correlationID", "CorrelationId", "CorrelationID") +} + +func TryGetContentType(props map[string]string) (string, bool) { + return tryGetProperty(props, "contentType", "ContentType") +} + +func TryGetType(props map[string]string) (string, bool) { + return tryGetProperty(props, "type", "Type") +} + +// ApplyMetadataToPublishing applies common metadata fields to an AMQP publishing +func ApplyMetadataToPublishing(metadata map[string]string, publishing *amqp.Publishing) { + contentType, ok := TryGetContentType(metadata) + if ok { + publishing.ContentType = contentType + } + + messageID, ok := TryGetMessageID(metadata) + if ok { + publishing.MessageId = messageID + } + + correlationID, ok := TryGetCorrelationID(metadata) + if ok { + publishing.CorrelationId = correlationID + } + + aType, ok := TryGetType(metadata) + if ok { + publishing.Type = aType + } +} diff --git a/metadata/utils.go b/metadata/utils.go index 6c2ddd445d..544f104ad2 100644 --- a/metadata/utils.go +++ b/metadata/utils.go @@ -114,14 +114,6 @@ func IsRawPayload(props map[string]string) (bool, error) { return false, nil } -func TryGetContentType(props map[string]string) (string, bool) { - if val, ok := props[ContentType]; ok && val != "" { - return val, true - } - - return "", false -} - func TryGetQueryIndexName(props map[string]string) (string, bool) { if val, ok := props[QueryIndexName]; ok && val != "" { return val, true diff --git a/metadata/utils_test.go b/metadata/utils_test.go index 44c09e2bb0..0f8f6de7b8 100644 --- a/metadata/utils_test.go +++ b/metadata/utils_test.go @@ -175,34 +175,6 @@ func TestIsRawPayload(t *testing.T) { }) } -func TestTryGetContentType(t *testing.T) { - t.Run("Metadata without content type", func(t *testing.T) { - val, ok := TryGetContentType(map[string]string{}) - - assert.Equal(t, "", val) - assert.False(t, ok) - }) - - t.Run("Metadata with empty content type", func(t *testing.T) { - val, ok := TryGetContentType(map[string]string{ - "contentType": "", - }) - - assert.Equal(t, "", val) - assert.False(t, ok) - }) - - t.Run("Metadata with corrent content type", func(t *testing.T) { - const contentType = "application/cloudevent+json" - val, ok := TryGetContentType(map[string]string{ - "contentType": contentType, - }) - - assert.Equal(t, contentType, val) - assert.True(t, ok) - }) -} - func TestMetadataStructToStringMap(t *testing.T) { t.Run("Test metadata struct to metadata info conversion", func(t *testing.T) { type NestedStruct struct { diff --git a/pubsub/rabbitmq/rabbitmq.go b/pubsub/rabbitmq/rabbitmq.go index b696082ed5..83d24b2b32 100644 --- a/pubsub/rabbitmq/rabbitmq.go +++ b/pubsub/rabbitmq/rabbitmq.go @@ -28,6 +28,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" + common "github.com/dapr/components-contrib/common/component/rabbitmq" "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" @@ -259,6 +260,8 @@ func (r *rabbitMQ) publishSync(ctx context.Context, req *pubsub.PublishRequest) p.Priority = priority } + common.ApplyMetadataToPublishing(req.Metadata, &p) + confirm, err := r.channel.PublishWithDeferredConfirmWithContext(ctx, req.Topic, routingKey, false, false, p) if err != nil { r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err) diff --git a/pubsub/rabbitmq/rabbitmq_test.go b/pubsub/rabbitmq/rabbitmq_test.go index a8be105b49..8ae2d13051 100644 --- a/pubsub/rabbitmq/rabbitmq_test.go +++ b/pubsub/rabbitmq/rabbitmq_test.go @@ -461,10 +461,11 @@ func createAMQPMessage(body []byte) amqp.Delivery { } type rabbitMQInMemoryBroker struct { - buffer chan amqp.Delivery - declaredQueues []string - connectCount atomic.Int32 - closeCount atomic.Int32 + buffer chan amqp.Delivery + declaredQueues []string + connectCount atomic.Int32 + closeCount atomic.Int32 + lastMsgMetadata *amqp.Publishing // Add this field to capture the last message metadata } func (r *rabbitMQInMemoryBroker) Qos(prefetchCount, prefetchSize int, global bool) error { @@ -482,7 +483,17 @@ func (r *rabbitMQInMemoryBroker) PublishWithDeferredConfirmWithContext(ctx conte return nil, errors.New(errorChannelConnection) } - r.buffer <- createAMQPMessage(msg.Body) + // Store the last message metadata for inspection in tests + r.lastMsgMetadata = &msg + + // Use a non-blocking send or a separate goroutine to prevent deadlock + // when there's no consumer reading from the buffer + select { + case r.buffer <- createAMQPMessage(msg.Body): + // Message sent successfully + default: + // Buffer is full or there's no consumer, but we don't want to block + } return nil, nil } @@ -525,3 +536,76 @@ func (r *rabbitMQInMemoryBroker) Close() error { func (r *rabbitMQInMemoryBroker) IsClosed() bool { return r.connectCount.Load() <= r.closeCount.Load() } + +// TestPublishMetadataProperties tests that message metadata properties are correctly passed to the broker +func TestPublishMetadataProperties(t *testing.T) { + broker := newBroker() + pubsubRabbitMQ := newRabbitMQTest(broker) + metadata := pubsub.Metadata{Base: mdata.Base{ + Properties: map[string]string{ + metadataHostnameKey: "anyhost", + metadataConsumerIDKey: "consumer", + }, + }} + err := pubsubRabbitMQ.Init(t.Context(), metadata) + require.NoError(t, err) + + topic := "metadatatest" + + // Create a consumer for the test to prevent channel deadlock + messageHandler := func(ctx context.Context, msg *pubsub.NewMessage) error { + return nil + } + err = pubsubRabbitMQ.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topic}, messageHandler) + require.NoError(t, err) + + // Test messageID + err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{ + Topic: topic, + Data: []byte("test message"), + Metadata: map[string]string{ + "messageID": "msg-123", + }, + }) + require.NoError(t, err) + assert.Equal(t, "msg-123", broker.lastMsgMetadata.MessageId) + + // Test correlationID + err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{ + Topic: topic, + Data: []byte("test message"), + Metadata: map[string]string{ + "correlationID": "corr-456", + }, + }) + require.NoError(t, err) + assert.Equal(t, "corr-456", broker.lastMsgMetadata.CorrelationId) + + // Test Type + err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{ + Topic: topic, + Data: []byte("test message"), + Metadata: map[string]string{ + "type": "mytype", + }, + }) + require.NoError(t, err) + assert.Equal(t, "mytype", broker.lastMsgMetadata.Type) + + // Test all properties together + err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{ + Topic: topic, + Data: []byte("test message"), + Metadata: map[string]string{ + "messageID": "msg-789", + "correlationID": "corr-789", + "type": "complete-type", + "contentType": "application/json", + }, + }) + require.NoError(t, err) + assert.Equal(t, "msg-789", broker.lastMsgMetadata.MessageId) + assert.Equal(t, "corr-789", broker.lastMsgMetadata.CorrelationId) + assert.Equal(t, "complete-type", broker.lastMsgMetadata.Type) + assert.Equal(t, "application/json", broker.lastMsgMetadata.ContentType) +} diff --git a/tests/certification/bindings/rabbitmq/components/metadata/rabbitmq.yaml b/tests/certification/bindings/rabbitmq/components/metadata/rabbitmq.yaml new file mode 100644 index 0000000000..5472860810 --- /dev/null +++ b/tests/certification/bindings/rabbitmq/components/metadata/rabbitmq.yaml @@ -0,0 +1,16 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: metadata-binding +spec: + type: bindings.rabbitmq + version: v1 + metadata: + - name: queueName + value: metadataQueue + - name: host + value: "amqp://test:test@localhost:5672" + - name: durable + value: true + - name: deleteWhenUnused + value: false \ No newline at end of file diff --git a/tests/certification/bindings/rabbitmq/rabbitmq_test.go b/tests/certification/bindings/rabbitmq/rabbitmq_test.go index dbd2b3ec74..44fe5f8ad6 100644 --- a/tests/certification/bindings/rabbitmq/rabbitmq_test.go +++ b/tests/certification/bindings/rabbitmq/rabbitmq_test.go @@ -19,6 +19,7 @@ import ( "crypto/x509" "fmt" "log" + "net/url" "os" "strconv" "testing" @@ -614,6 +615,102 @@ func amqpMtlsExternalAuthReady(url string) flow.Runnable { } } +func TestRabbitMQMetadataProperties(t *testing.T) { + messages := watcher.NewUnordered() + + ports, _ := dapr_testing.GetFreePorts(3) + grpcPort := ports[0] + httpPort := ports[1] + appPort := ports[2] + + // Define the test values for metadata with fixed IDs + const messageData = "metadata-test-message" + const msgID = "msg-id-123" + const corrID = "corr-id-456" + const msgType = "test-type" + const contentType = "application/json" + + test := func(ctx flow.Context) error { + client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort)) + require.NoError(t, err, "Could not initialize dapr client.") + + metadata := map[string]string{ + "messageID": msgID, + "correlationID": corrID, + "type": msgType, + "contentType": contentType, + } + + ctx.Log("Invoking binding with metadata properties!") + req := &daprClient.InvokeBindingRequest{ + Name: "metadata-binding", + Operation: "create", + Data: []byte(messageData), + Metadata: metadata, + } + + err = client.InvokeOutputBinding(ctx, req) + require.NoError(ctx, err, "error publishing message with metadata") + + // Assertion on the data and metadata. + messages.ExpectStrings(messageData) + messages.Assert(ctx, time.Minute) + + return nil + } + + application := func(ctx flow.Context, s common.Service) (err error) { + // Setup the input binding endpoint. + err = multierr.Combine(err, + s.AddBindingInvocationHandler("metadata-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + msg := string(in.Data) + messages.Observe(msg) + + // Log the received metadata for debugging + ctx.Logf("Got message: %s with metadata: %+v", msg, in.Metadata) + + // Alternative approach with more formatted output + metadataStr := "" + for k, v := range in.Metadata { + if metadataStr != "" { + metadataStr += ", " + } + metadataStr += fmt.Sprintf("%s=%s", k, v) + } + ctx.Logf("Message metadata details: {%s}", metadataStr) + + // TODO: should the header be written with dash? E.g. "message-id" instead of "Messageid" + // TODO: Should all content always be URL-encoded? E.g. "application/json" instead of "application/json" + // Verify all metadata was correctly passed using proper assertions + require.Equal(t, msgID, in.Metadata["Messageid"], "messageID should match expected value") + require.Equal(t, corrID, in.Metadata["Correlationid"], "correlationID should match expected value") + decodedContentType, err := url.QueryUnescape(in.Metadata["Contenttype"]) + require.NoError(t, err, "failed to URL-decode content-type") + require.Equal(t, contentType, decodedContentType, "contentType should match expected value") + require.Equal(t, msgType, in.Metadata["Type"], "type should match expected value") + + return []byte("{}"), nil + })) + return err + } + + flow.New(t, "rabbitmq metadata properties certification"). + Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step("wait for rabbitmq readiness", + retry.Do(time.Second, 30, amqpReady(rabbitMQURL))). + Step(app.Run("metadataApp", fmt.Sprintf(":%d", appPort), application)). + Step(sidecar.Run("metadataSidecar", + append(componentRuntimeOptions(), + embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), + embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)), + embedded.WithComponentsPath("./components/metadata"), + )..., + )). + Step("send with metadata and verify", test). + Run() +} + func componentRuntimeOptions() []embedded.Option { log := logger.NewLogger("dapr.components")