-
Notifications
You must be signed in to change notification settings - Fork 5
V4.1.0/support for rabbitmq #35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Caution Review failedThe pull request is closed. WalkthroughThis update introduces a new RabbitMQ integration for the Savvyio framework, adding both source and functional test projects. It implements point-to-point command queues and publish-subscribe event bus patterns using RabbitMQ, with configurable options and marshaller support. Comprehensive functional tests validate serialization, signing, and concurrent message handling. Changes
Sequence Diagram(s)Command Queue: Send and Receive FlowsequenceDiagram
participant Producer as Command Sender
participant RabbitMqCommandQueue
participant RabbitMQ as RabbitMQ Broker
participant Consumer as Command Receiver
Producer->>RabbitMqCommandQueue: SendAsync(commands)
RabbitMqCommandQueue->>RabbitMQ: Publish messages to queue
Consumer->>RabbitMqCommandQueue: ReceiveAsync()
RabbitMqCommandQueue->>RabbitMQ: Consume messages from queue
RabbitMqCommandQueue-->>Consumer: Yield deserialized messages
Consumer->>RabbitMqCommandQueue: Acknowledge (if enabled)
Event Bus: Publish and Subscribe FlowsequenceDiagram
participant Publisher as Event Publisher
participant RabbitMqEventBus
participant RabbitMQ as RabbitMQ Broker
participant Subscriber as Event Subscriber
Publisher->>RabbitMqEventBus: PublishAsync(event)
RabbitMqEventBus->>RabbitMQ: Publish message to fanout exchange
Subscriber->>RabbitMqEventBus: SubscribeAsync(handler)
RabbitMqEventBus->>RabbitMQ: Bind temporary queue to exchange
RabbitMQ-->>RabbitMqEventBus: Deliver messages to queue
RabbitMqEventBus-->>Subscriber: Invoke handler with deserialized message
Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI ⛔ Files ignored due to path filters (2)
📒 Files selected for processing (85)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (6)
src/Savvyio.Extensions.RabbitMQ/Savvyio.Extensions.RabbitMQ.csproj (1)
4-5: Include 'rabbitmq' in PackageTags.
For better NuGet discoverability, consider appendingrabbitmq(and/ormessaging) to<PackageTags>.- <PackageTags>azure queue storage event-grid bus</PackageTags> + <PackageTags>azure queue storage event-grid bus rabbitmq messaging</PackageTags>src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueue.cs (1)
59-66: Consider making message persistence configurable.The
Persistentproperty is commented out, which means messages won't survive RabbitMQ restarts. Consider making this configurable through options or documenting why persistence is disabled.await RabbitMqChannel.BasicPublishAsync("", _options.QueueName, true, basicProperties: new BasicProperties() { - //Persistent = true, + Persistent = _options.PersistentMessages ?? true, Headers = new Dictionary<string, object>() { { MessageType, message.GetType().ToFullNameIncludingAssemblyName() } }test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Commands/RabbitMqCommandQueueNewtonsoftJsonSerializerContextTest.cs (2)
30-72: Consider more robust synchronization instead of fixed delays.The test uses fixed delays (
Task.Delay(200)) which could be unreliable in CI environments or under load. Consider using more deterministic synchronization mechanisms.- await Task.Delay(200); // wait briefly to ensure subscription setup + // Wait for subscription to be ready with timeout + var subscriptionReady = new TaskCompletionSource<bool>(); + Task.Run<Task>(async () => + { + subscriptionReady.SetResult(true); + await foreach (var msg in queue.ReceiveAsync().ConfigureAwait(false)) + { + await receivedMessages.Writer.WriteAsync(msg).ConfigureAwait(false); + } + }); + await subscriptionReady.Task.WaitAsync(TimeSpan.FromSeconds(5));
119-165: Enhance bulk test with better concurrency validation.The bulk test sends 100 messages but doesn't validate that all messages are processed correctly in a concurrent scenario. Consider adding timeout handling and ensuring message ordering isn't assumed.
+ var timeout = TimeSpan.FromSeconds(10); + var receivedCount = 0; Task.Run<Task>(async () => { await foreach (var msg in queue.ReceiveAsync().ConfigureAwait(false)) { + Interlocked.Increment(ref receivedCount); await receivedMessages.Writer.WriteAsync(msg).ConfigureAwait(false); + if (receivedCount >= messages.Count) + { + receivedMessages.Writer.Complete(); + break; + } } });test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Commands/RabbitMqCommandQueueJsonSerializerContextTest.cs (1)
144-162: Validate concurrent consumer implementation.The bulk test introduces concurrent consumers but doesn't ensure proper load balancing verification. The count variables
count1andcount2are tracked but not asserted against expected distribution.Consider validating that both consumers processed messages:
TestOutput.WriteLine(count1.ToString()); TestOutput.WriteLine(count2.ToString()); + + // Verify both consumers processed messages (for point-to-point, only one should process each message) + Assert.True(count1 > 0, "First consumer should have processed some messages"); + Assert.True(count2 > 0, "Second consumer should have processed some messages"); + Assert.Equal(100, count1 + count2); // Total should equal sent messagestest/Savvyio.Extensions.RabbitMQ.FunctionalTests/EventDriven/RabbitMqEventBusNewtonsoftJsonSerializerContextTest.cs (1)
28-30: Fix indentation inconsistency.There's an extra space in the constructor declaration that's inconsistent with the coding style used elsewhere.
- public RabbitMqEventBusNewtonsoftJsonSerializerContextTest(ITestOutputHelper output) : base(output) + public RabbitMqEventBusNewtonsoftJsonSerializerContextTest(ITestOutputHelper output) : base(output)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
Directory.Packages.props(2 hunks)Savvyio.sln(3 hunks)src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueue.cs(1 hunks)src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueueOptions.cs(1 hunks)src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs(1 hunks)src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBusOptions.cs(1 hunks)src/Savvyio.Extensions.RabbitMQ/RabbitMqMessage.cs(1 hunks)src/Savvyio.Extensions.RabbitMQ/RabbitMqMessageOptions.cs(1 hunks)src/Savvyio.Extensions.RabbitMQ/Savvyio.Extensions.RabbitMQ.csproj(1 hunks)test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Assets/CreateMemberCommand.cs(1 hunks)test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Assets/MemberCreated.cs(1 hunks)test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Commands/RabbitMqCommandQueueJsonSerializerContextTest.cs(1 hunks)test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Commands/RabbitMqCommandQueueNewtonsoftJsonSerializerContextTest.cs(1 hunks)test/Savvyio.Extensions.RabbitMQ.FunctionalTests/EventDriven/RabbitMqEventBusJsonSerializerContextTest.cs(1 hunks)test/Savvyio.Extensions.RabbitMQ.FunctionalTests/EventDriven/RabbitMqEventBusNewtonsoftJsonSerializerContextTest.cs(1 hunks)test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extensions.RabbitMQ.FunctionalTests.csproj(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (5)
src/Savvyio.Extensions.RabbitMQ/RabbitMqMessageOptions.cs (2)
src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueueOptions.cs (1)
ValidateOptions(59-63)src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBusOptions.cs (1)
ValidateOptions(45-49)
src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueueOptions.cs (2)
src/Savvyio.Extensions.RabbitMQ/RabbitMqMessageOptions.cs (3)
RabbitMqMessageOptions(11-53)RabbitMqMessageOptions(29-32)ValidateOptions(49-52)src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBusOptions.cs (1)
ValidateOptions(45-49)
src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBusOptions.cs (2)
src/Savvyio.Extensions.RabbitMQ/RabbitMqMessageOptions.cs (3)
RabbitMqMessageOptions(11-53)RabbitMqMessageOptions(29-32)ValidateOptions(49-52)src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueueOptions.cs (1)
ValidateOptions(59-63)
src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs (3)
src/Savvyio.Extensions.RabbitMQ/RabbitMqMessage.cs (3)
RabbitMqMessage(13-111)RabbitMqMessage(35-44)Task(66-83)src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBusOptions.cs (2)
RabbitMqEventBusOptions(10-50)RabbitMqEventBusOptions(28-30)src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueue.cs (2)
Task(49-68)Task(113-119)
test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Commands/RabbitMqCommandQueueJsonSerializerContextTest.cs (1)
test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Assets/CreateMemberCommand.cs (1)
CreateMemberCommand(7-12)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: call-build (Debug) / 🛠️ Build
- GitHub Check: call-build (Release) / 🛠️ Build
🔇 Additional comments (33)
test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Assets/MemberCreated.cs (2)
1-4: No issues with the using directive or namespace declaration.
5-16: Record implementation looks good.
TheMemberCreatedrecord correctly extendsIntegrationEventand exposes immutable properties for functional tests.src/Savvyio.Extensions.RabbitMQ/Savvyio.Extensions.RabbitMQ.csproj (2)
8-11: Validate PackageReferences.
Using central package management is correct—Cuemon.Extensions.IOandRabbitMQ.Clientwill pick up versions fromDirectory.Packages.props.
13-19: Project references look accurate.
References to core Savvyio projects are in place for the RabbitMQ extension.test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extensions.RabbitMQ.FunctionalTests.csproj (2)
7-9: PackageReference usage is correct.
System.Linq.Asyncis specified without a version, leveraging the centralized package management inDirectory.Packages.props.
11-19: ProjectReference paths are valid.
The references target the correct functional and core projects needed for testing the RabbitMQ extension.test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Assets/CreateMemberCommand.cs (2)
1-4: No concerns with the using directive or namespace declaration.
5-19: Command record is well-defined.
CreateMemberCommandcorrectly inherits fromCommandand exposes immutable test parameters.Directory.Packages.props (5)
7-8: Verify AWS SDK version updates.
AWSSDK.SQSandAWSSDK.SimpleNotificationServicehave been bumped—confirm these version changes are intentional and compatible.
13-13: Review Codebelt.Extensions.Xunit.App update.
Upgrading to10.0.2may alter test behavior; ensure there are no regressions.
25-26: Validate test framework package versions.
Microsoft.NET.Test.SdkandMicrosoft.TestPlatform.ObjectModelwere updated; verify that the CI/test runners remain compatible.
28-28: New RabbitMQ.Client package added.
DependencyRabbitMQ.Client(7.1.2) aligns with the new extension; central version management is correctly applied.
36-36: Verify xunit.runner.visualstudio version.
Updating to3.1.1may affect test discovery; ensure the pipeline supports this runner.Savvyio.sln (3)
122-125: LGTM! Project additions follow established patterns.The new RabbitMQ projects are correctly added with proper project type GUIDs and paths.
356-363: LGTM! Build configurations are properly defined.Both Debug and Release configurations are correctly set up for the new RabbitMQ projects.
425-426: LGTM! Solution folder nesting is correct.The projects are appropriately nested under the "src" and "test" solution folders.
src/Savvyio.Extensions.RabbitMQ/RabbitMqMessageOptions.cs (2)
29-32: LGTM! Sensible default configuration.The constructor provides a reasonable default AMQP URL for local development.
49-52: LGTM! Proper validation implementation.The validation correctly ensures AmqpUrl is not null, following the framework's validation patterns.
src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBusOptions.cs (3)
10-10: LGTM! Proper inheritance design.The class correctly inherits from
RabbitMqMessageOptionsto reuse common RabbitMQ configuration.
28-30: LGTM! Constructor design follows inheritance best practices.The empty constructor appropriately relies on the base class for common initialization.
45-49: LGTM! Validation follows established pattern.The validation override correctly validates the event bus-specific property before calling base validation, consistent with the pattern used in
RabbitMqCommandQueueOptions.src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueueOptions.cs (3)
10-10: LGTM! Consistent inheritance design.The class properly inherits from
RabbitMqMessageOptions, maintaining consistency withRabbitMqEventBusOptions.
42-50: LGTM! Well-defined command queue properties.Both
QueueNameandAutoAcknowledgeproperties are appropriately documented and typed for command queue functionality.
59-63: LGTM! Validation pattern is consistent.The validation override follows the same pattern as
RabbitMqEventBusOptions, ensuring required properties are validated before calling base validation.src/Savvyio.Extensions.RabbitMQ/RabbitMqMessage.cs (3)
35-44: LGTM! Well-structured constructor with proper validation.The constructor properly validates inputs and initializes the connection factory with the provided AMQP URL.
66-83: LGTM! Thread-safe connection initialization with proper async locking.The double-checked locking pattern is correctly implemented for async scenarios, ensuring only one initialization occurs while avoiding unnecessary lock acquisitions.
106-110: LGTM! Proper async disposal implementation.The disposal order is correct (channel before connection) and null checks prevent exceptions for uninitialized resources.
src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueue.cs (1)
77-111: LGTM! Well-implemented async message consumption with proper acknowledgment handling.The implementation correctly handles message deserialization, metadata attachment, and conditional auto-acknowledgment. The async enumerable pattern provides a clean API for consumers.
src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs (2)
50-65: LGTM! Correct implementation of fanout exchange publishing.The method properly declares a fanout exchange and publishes messages with appropriate metadata.
73-106: LGTM! Well-implemented subscription with temporary queues.The implementation correctly creates temporary queues for each subscriber and uses auto-acknowledgment, which is appropriate for event bus scenarios. The fanout exchange ensures all subscribers receive published messages.
test/Savvyio.Extensions.RabbitMQ.FunctionalTests/EventDriven/RabbitMqEventBusJsonSerializerContextTest.cs (2)
269-276: Excellent use of parallel publishing for stress testing.The bulk test correctly uses
ParallelFactory.ForEachAsyncto publish messages concurrently, which properly tests the event bus under load and validates the publish-subscribe pattern with multiple subscribers.
287-295: Verify publish-subscribe semantics correctly.The assertions properly validate that in a publish-subscribe pattern, both subscribers receive all 100 messages, which is the expected behavior for event bus patterns.
test/Savvyio.Extensions.RabbitMQ.FunctionalTests/EventDriven/RabbitMqEventBusNewtonsoftJsonSerializerContextTest.cs (1)
32-296: Consistent test implementation across serializers.The test methods are well-structured and consistent with the JsonSerializer version, providing comprehensive coverage of event bus functionality with proper validation of publish-subscribe semantics.
...vvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extensions.RabbitMQ.FunctionalTests.csproj
Show resolved
Hide resolved
src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs
Outdated
Show resolved
Hide resolved
...xtensions.RabbitMQ.FunctionalTests/Commands/RabbitMqCommandQueueJsonSerializerContextTest.cs
Show resolved
Hide resolved
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #35 +/- ##
===========================================
- Coverage 86.69% 75.59% -11.10%
===========================================
Files 158 167 +9
Lines 3284 3483 +199
Branches 332 348 +16
===========================================
- Hits 2847 2633 -214
- Misses 435 848 +413
Partials 2 2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|


This pull request introduces RabbitMQ support to the project by adding new implementations for a command queue and an event bus, along with the necessary configuration options. It also includes updates to package dependencies and solution files to accommodate the new functionality.
RabbitMQ Integration:
RabbitMqCommandQueueclass to implement a point-to-point command queue using RabbitMQ. This includes methods for sending and receiving command messages asynchronously. (src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueue.cs)RabbitMqEventBusclass to implement a publish-subscribe event bus for integration events using RabbitMQ. This includes methods for publishing and subscribing to integration events. (src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs)Configuration Options:
RabbitMqCommandQueueOptionsclass for configuringRabbitMqCommandQueuewith properties such asQueueNameandAutoAcknowledge. (src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueueOptions.cs)RabbitMqEventBusOptionsclass for configuringRabbitMqEventBuswith properties such asExchangeName. (src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBusOptions.cs)Solution and Dependency Updates:
Directory.Packages.propsto include new versions of dependencies and addRabbitMQ.Clientversion7.1.2. (Directory.Packages.props) [1] [2]Savvyio.slnto include new projects forSavvyio.Extensions.RabbitMQand its functional tests. (Savvyio.sln) [1] [2] [3]Summary by CodeRabbit
New Features
Tests
Chores