Skip to content

Commit d08cdad

Browse files
committed
fix(iot-device): Fix for retrieving message system properties over Http
1 parent 5e0f250 commit d08cdad

13 files changed

+128
-100
lines changed

e2e/test/AzureSecurityCenterForIoTSecurityMessageE2ETests.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private async Task SendSingleSecurityMessageAsync(
167167
Client.Message testMessage = ComposeD2CSecurityTestMessage(out string eventId, out string payload, out string p1Value);
168168
await deviceClient.SendEventAsync(testMessage).ConfigureAwait(false);
169169

170-
await ValidateEventAsync(deviceId, eventId, payload, p1Value, logAnalticsTestClient).ConfigureAwait(false);
170+
await ValidateEventAsync(deviceId, testMessage, eventId, payload, p1Value, logAnalticsTestClient).ConfigureAwait(false);
171171
}
172172

173173
private async Task SendSingleSecurityMessageModuleAsync(
@@ -180,17 +180,18 @@ private async Task SendSingleSecurityMessageModuleAsync(
180180
Client.Message testMessage = ComposeD2CSecurityTestMessage(out string eventId, out string payload, out string p1Value);
181181
await moduleClient.SendEventAsync(testMessage).ConfigureAwait(false);
182182

183-
await ValidateEventAsync(deviceId, eventId, payload, p1Value, logAnalticsTestClient).ConfigureAwait(false);
183+
await ValidateEventAsync(deviceId, testMessage, eventId, payload, p1Value, logAnalticsTestClient).ConfigureAwait(false);
184184
}
185185

186186
private async Task ValidateEventAsync(
187187
string deviceId,
188+
Client.Message message,
188189
string eventId,
189190
string payload,
190191
string p1Value,
191192
AzureSecurityCenterForIoTLogAnalyticsClient logAnalticsTestClient)
192193
{
193-
bool isReceivedEventHub = EventHubTestListener.VerifyIfMessageIsReceived(deviceId, payload, p1Value, TimeSpan.FromSeconds(10));
194+
bool isReceivedEventHub = EventHubTestListener.VerifyIfMessageIsReceived(deviceId, message, payload, p1Value, TimeSpan.FromSeconds(10));
194195
Assert.IsFalse(isReceivedEventHub, "Security message received in customer event hub.");
195196
bool isReceivedOms = await logAnalticsTestClient.IsRawEventExist(deviceId, eventId).ConfigureAwait(false);
196197
Assert.IsTrue(isReceivedOms, "Security message was not received in customer log analytics");

e2e/test/CombinedClientOperationsPoolAmqpTests.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ private async Task DeviceCombinedClientOperationsAsync(
122122
// Initialize service client for service-side operations
123123
using ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(Configuration.IoTHub.ConnectionString);
124124

125-
// Message payload for C2D operation
126-
var messagesSent = new Dictionary<string, List<string>>();
125+
// Message payload and properties for C2D operation
126+
var messagesSent = new Dictionary<string, Tuple<Message, string>>();
127127

128128
// Twin properties
129129
var twinPropertyMap = new Dictionary<string, List<string>>();
@@ -134,10 +134,10 @@ private async Task DeviceCombinedClientOperationsAsync(
134134

135135
// Send C2D Message
136136
s_log.WriteLine($"{nameof(CombinedClientOperationsPoolAmqpTests)}: Send C2D for device={testDevice.Id}");
137-
(Message msg, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
137+
(Message msg, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
138138
using (msg)
139139
{
140-
messagesSent.Add(testDevice.Id, new List<string> { payload, p1Value });
140+
messagesSent.Add(testDevice.Id, Tuple.Create(msg, payload));
141141
Task sendC2dMessage = serviceClient.SendAsync(testDevice.Id, msg);
142142
initOperations.Add(sendC2dMessage);
143143

@@ -170,10 +170,11 @@ private async Task DeviceCombinedClientOperationsAsync(
170170

171171
// C2D Operation
172172
s_log.WriteLine($"{nameof(CombinedClientOperationsPoolAmqpTests)}: Operation 2: Receive C2D for device={testDevice.Id}");
173-
List<string> msgSent = messagesSent[testDevice.Id];
174-
string payload = msgSent[0];
175-
string p1Value = msgSent[1];
176-
Task verifyDeviceClientReceivesMessage = MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value);
173+
Tuple<Message, string> msgSent = messagesSent[testDevice.Id];
174+
Message msg = msgSent.Item1;
175+
string payload = msgSent.Item2;
176+
177+
Task verifyDeviceClientReceivesMessage = MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, msg, payload);
177178
clientOperations.Add(verifyDeviceClientReceivesMessage);
178179

179180
// Invoke direct methods

e2e/test/EventHubTestListener.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ private EventHubTestListener()
4242
}
4343

4444
// verify required message is present in the dictionary
45-
public static bool VerifyIfMessageIsReceived(string deviceId, string payload, string p1Value, TimeSpan? maxWaitTime = null)
45+
public static bool VerifyIfMessageIsReceived(string deviceId, Client.Message message, string payload, string p1Value, TimeSpan? maxWaitTime = null)
4646
{
4747
if (!maxWaitTime.HasValue)
4848
{
4949
maxWaitTime = s_maximumWaitTime;
5050
}
5151

52-
s_log.WriteLine($"Expected payload: deviceId={deviceId}; payload={payload}; property1={p1Value}");
52+
s_log.WriteLine($"Expected payload: deviceId={deviceId}; messageId = {message.MessageId}, userId={message.UserId}, payload={payload}; property1={p1Value}");
5353

5454
bool isReceived = false;
5555

@@ -64,7 +64,7 @@ public static bool VerifyIfMessageIsReceived(string deviceId, string payload, st
6464
continue;
6565
}
6666

67-
isReceived = VerifyTestMessage(eventData, deviceId, p1Value);
67+
isReceived = VerifyTestMessage(eventData, deviceId, message, p1Value);
6868
}
6969

7070
sw.Stop();
@@ -101,7 +101,7 @@ private static string GetEventDataBody(EventData eventData)
101101
#endif
102102
}
103103

104-
private static bool VerifyTestMessage(EventData eventData, string deviceName, string p1Value)
104+
private static bool VerifyTestMessage(EventData eventData, string deviceName, Client.Message message, string p1Value)
105105
{
106106
#if NET451
107107
var connectionDeviceId = eventData.SystemProperties["iothub-connection-device-id"].ToString();

e2e/test/FaultInjectionPoolAmqpTests.MessageReceiveFaultInjectionPoolAmqpTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -618,15 +618,15 @@ private async Task ReceiveMessageRecoveryPoolOverAmqp(
618618

619619
Func<DeviceClient, TestDevice, Task> testOperation = async (deviceClient, testDevice) =>
620620
{
621-
(Message msg, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
621+
(Message msg, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
622622
_log.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}: Sending message to device {testDevice.Id}: payload='{payload}' p1Value='{p1Value}'");
623623
await serviceClient.SendAsync(testDevice.Id, msg)
624624
.ConfigureAwait(false);
625625

626626
_log.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
627627
await deviceClient.OpenAsync()
628628
.ConfigureAwait(false);
629-
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value)
629+
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, msg, payload)
630630
.ConfigureAwait(false);
631631
};
632632

e2e/test/FaultInjectionPoolAmqpTests.MessageSendFaultInjectionPoolAmqpTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -866,12 +866,12 @@ private async Task SendMessageRecoveryPoolOverAmqpAsync(
866866
_log.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}: Preparing to send message for device {testDevice.Id}");
867867
await deviceClient.OpenAsync().ConfigureAwait(false);
868868

869-
(Client.Message testMessage, string messageId, string payload, string p1Value) = MessageSendE2ETests.ComposeD2cTestMessage();
869+
(Client.Message testMessage, string payload, string p1Value) = MessageSendE2ETests.ComposeD2cTestMessage();
870870

871871
_log.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}.{testDevice.Id}: payload='{payload}' p1Value='{p1Value}'");
872872
await deviceClient.SendEventAsync(testMessage).ConfigureAwait(false);
873873

874-
bool isReceived = EventHubTestListener.VerifyIfMessageIsReceived(testDevice.Id, payload, p1Value);
874+
bool isReceived = EventHubTestListener.VerifyIfMessageIsReceived(testDevice.Id, testMessage, payload, p1Value);
875875
Assert.IsTrue(isReceived, $"Message is not received for device {testDevice.Id}.");
876876
};
877877

e2e/test/MessageFeedbackE2ETests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private static async Task CompleteMessageMixOrder(TestDeviceType type, Client.Tr
5858
var messages = new List<Client.Message>();
5959
for (int i = 0; i < MESSAGE_COUNT; i++)
6060
{
61-
(Message msg, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
61+
(Message msg, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
6262
await serviceClient.SendAsync(testDevice.Id, msg).ConfigureAwait(false);
6363
Client.Message message = await deviceClient.ReceiveAsync(TIMESPAN_ONE_MINUTE).ConfigureAwait(false);
6464
if (message == null)

e2e/test/MessageReceiveE2EPoolAmqpTests.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,15 @@ private async Task ReceiveMessagePoolOverAmqpAsync(
114114
int devicesCount,
115115
ConnectionStringAuthScope authScope = ConnectionStringAuthScope.Device)
116116
{
117-
var messagesSent = new Dictionary<string, List<string>>();
117+
var messagesSent = new Dictionary<string, Tuple<Message, string>>();
118118

119119
// Initialize the service client
120120
ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(Configuration.IoTHub.ConnectionString);
121121

122122
Func<DeviceClient, TestDevice, Task> initOperation = async (deviceClient, testDevice) =>
123123
{
124-
(Message msg, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
125-
messagesSent.Add(testDevice.Id, new List<string> { payload, p1Value });
124+
(Message msg, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
125+
messagesSent.Add(testDevice.Id, Tuple.Create(msg, payload));
126126

127127
await serviceClient.SendAsync(testDevice.Id, msg).ConfigureAwait(false);
128128
};
@@ -132,11 +132,8 @@ private async Task ReceiveMessagePoolOverAmqpAsync(
132132
_log.WriteLine($"{nameof(MessageReceiveE2EPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
133133
await deviceClient.OpenAsync().ConfigureAwait(false);
134134

135-
List<string> msgSent = messagesSent[testDevice.Id];
136-
string payload = msgSent[0];
137-
string p1Value = msgSent[1];
138-
139-
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value).ConfigureAwait(false);
135+
Tuple<Message, string> msgSent = messagesSent[testDevice.Id];
136+
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, msgSent.Item1, msgSent.Item2).ConfigureAwait(false);
140137
};
141138

142139
Func<Task> cleanupOperation = async () =>

e2e/test/MessageReceiveE2ETests.cs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ namespace Microsoft.Azure.Devices.E2ETests
2020
public partial class MessageReceiveE2ETests : IDisposable
2121
{
2222
private static readonly string s_devicePrefix = $"E2E_{nameof(MessageReceiveE2ETests)}_";
23+
2324
private static readonly TestLogging s_log = TestLogging.GetInstance();
2425
private static readonly TimeSpan s_oneMinute = TimeSpan.FromMinutes(1);
2526
private static readonly TimeSpan s_oneSecond = TimeSpan.FromSeconds(1);
@@ -220,30 +221,33 @@ public async Task Message_DeviceReceiveMessageOperationTimeout_MqttWs()
220221
await ReceiveMessageInOperationTimeoutAsync(TestDeviceType.Sasl, Client.TransportType.Mqtt_WebSocket_Only).ConfigureAwait(false);
221222
}
222223

223-
public static (Message message, string messageId, string payload, string p1Value) ComposeC2dTestMessage()
224+
public static (Message message, string payload, string p1Value) ComposeC2dTestMessage()
224225
{
225226
var payload = Guid.NewGuid().ToString();
226227
var messageId = Guid.NewGuid().ToString();
227228
var p1Value = Guid.NewGuid().ToString();
229+
var userId = Guid.NewGuid().ToString();
228230

229-
s_log.WriteLine($"{nameof(ComposeC2dTestMessage)}: messageId='{messageId}' payload='{payload}' p1Value='{p1Value}'");
231+
s_log.WriteLine($"{nameof(ComposeC2dTestMessage)}: messageId='{messageId}' userId='{userId}' payload='{payload}' p1Value='{p1Value}'");
230232
var message = new Message(Encoding.UTF8.GetBytes(payload))
231233
{
232234
MessageId = messageId,
235+
UserId = userId,
233236
Properties = { ["property1"] = p1Value }
234237
};
235238

236-
return (message, messageId, payload, p1Value);
239+
return (message, payload, p1Value);
237240
}
238241

239-
public static async Task VerifyReceivedC2DMessageAsync(Client.TransportType transport, DeviceClient dc, string deviceId, string payload, string p1Value)
242+
public static async Task VerifyReceivedC2DMessageAsync(Client.TransportType transport, DeviceClient dc, string deviceId, Message message, string payload)
240243
{
244+
string receivedMessageDestination = $"/devices/{deviceId}/messages/deviceBound";
245+
241246
var sw = new Stopwatch();
242247
bool received = false;
243248

244249
sw.Start();
245250

246-
247251
while (!received && sw.ElapsedMilliseconds < FaultInjection.RecoveryTimeMilliseconds)
248252
{
249253
Client.Message receivedMessage = null;
@@ -277,14 +281,19 @@ public static async Task VerifyReceivedC2DMessageAsync(Client.TransportType tran
277281
// ignore exception from CompleteAsync
278282
}
279283

284+
Assert.AreEqual(receivedMessage.MessageId, message.MessageId, "Recieved message Id is not what was sent by service");
285+
Assert.AreEqual(receivedMessage.UserId, message.UserId, "Recieved user Id is not what was sent by service");
286+
Assert.AreEqual(receivedMessage.To, receivedMessageDestination, "Recieved message destination is not what was sent by service");
287+
280288
string messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
281289
s_log.WriteLine($"{nameof(VerifyReceivedC2DMessageAsync)}: Received message: for {deviceId}: {messageData}");
282290
if (Equals(payload, messageData))
283291
{
284292
Assert.AreEqual(1, receivedMessage.Properties.Count, $"The count of received properties did not match for device {deviceId}");
285293
System.Collections.Generic.KeyValuePair<string, string> prop = receivedMessage.Properties.Single();
286-
Assert.AreEqual("property1", prop.Key, $"The key \"property1\" did not match for device {deviceId}");
287-
Assert.AreEqual(p1Value, prop.Value, $"The value of \"property1\" did not match for device {deviceId}");
294+
string propertyKey = "property1";
295+
Assert.AreEqual(propertyKey, prop.Key, $"The key \"property1\" did not match for device {deviceId}");
296+
Assert.AreEqual(message.Properties[propertyKey], prop.Value, $"The value of \"property1\" did not match for device {deviceId}");
288297
received = true;
289298
}
290299
}
@@ -422,11 +431,11 @@ private async Task ReceiveSingleMessageAsync(TestDeviceType type, Client.Transpo
422431

423432
await serviceClient.OpenAsync().ConfigureAwait(false);
424433

425-
(Message msg, string messageId, string payload, string p1Value) = ComposeC2dTestMessage();
434+
(Message msg, string payload, string p1Value) = ComposeC2dTestMessage();
426435
using (msg)
427436
{
428437
await serviceClient.SendAsync(testDevice.Id, msg).ConfigureAwait(false);
429-
await VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value).ConfigureAwait(false);
438+
await VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, msg, payload).ConfigureAwait(false);
430439
}
431440

432441
await deviceClient.CloseAsync().ConfigureAwait(false);
@@ -450,7 +459,7 @@ private async Task ReceiveSingleMessageWithCancellationTokenAsync(TestDeviceType
450459

451460
await serviceClient.OpenAsync().ConfigureAwait(false);
452461

453-
(Message msg, string messageId, string payload, string p1Value) = ComposeC2dTestMessage();
462+
(Message msg, string payload, string p1Value) = ComposeC2dTestMessage();
454463
using (msg)
455464
{
456465
await serviceClient.SendAsync(testDevice.Id, msg).ConfigureAwait(false);

e2e/test/MessageReceiveFaultInjectionTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,9 @@ private async Task ReceiveMessageRecovery(
204204

205205
Func<DeviceClient, TestDevice, Task> testOperation = async (deviceClient, testDevice) =>
206206
{
207-
(Message message, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
207+
(Message message, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
208208
await serviceClient.SendAsync(testDevice.Id, message).ConfigureAwait(false);
209-
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value).ConfigureAwait(false);
209+
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, message, payload).ConfigureAwait(false);
210210
};
211211

212212
Func<Task> cleanupOperation = () =>

e2e/test/MessageSendE2EPoolAmqpTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,11 @@ private async Task SendMessagePoolOverAmqp(
130130
s_log.WriteLine($"{nameof(MessageSendE2EPoolAmqpTests)}: Preparing to send message for device {testDevice.Id}");
131131
await deviceClient.OpenAsync().ConfigureAwait(false);
132132

133-
(Client.Message testMessage, string messageId, string payload, string p1Value) = MessageSendE2ETests.ComposeD2cTestMessage();
134-
s_log.WriteLine($"{nameof(MessageSendE2EPoolAmqpTests)}.{testDevice.Id}: messageId='{messageId}' payload='{payload}' p1Value='{p1Value}'");
133+
(Client.Message testMessage, string payload, string p1Value) = MessageSendE2ETests.ComposeD2cTestMessage();
134+
s_log.WriteLine($"{nameof(MessageSendE2EPoolAmqpTests)}.{testDevice.Id}: messageId='{testMessage.MessageId}' payload='{payload}' p1Value='{p1Value}'");
135135
await deviceClient.SendEventAsync(testMessage).ConfigureAwait(false);
136136

137-
bool isReceived = EventHubTestListener.VerifyIfMessageIsReceived(testDevice.Id, payload, p1Value);
137+
bool isReceived = EventHubTestListener.VerifyIfMessageIsReceived(testDevice.Id, testMessage, payload, p1Value);
138138
Assert.IsTrue(isReceived, "Message is not received.");
139139
};
140140

0 commit comments

Comments
 (0)